diff options
author | ctiller <ctiller@google.com> | 2015-01-07 14:03:30 -0800 |
---|---|---|
committer | Tim Emiola <temiola@google.com> | 2015-01-08 13:01:56 -0800 |
commit | 1a277ecd93e908a47a905d323a4a1a77287cede1 (patch) | |
tree | d1648196ca8201d778e73c7c9e59a8ac90613763 | |
parent | 3040cb7c434e83c0e70839ac20218f1c2d77e1eb (diff) |
Remove libevent.
Fixed any exposed bugs across the stack.
Add a poll() based implementation. Heavily leverages pollset infrastructure to allow small polls to be the norm. Exposes a mechanism to plug in epoll/kqueue for platforms where we have them.
Simplify iomgr callbacks to return one bit of success or failure (instead of the multi valued result that was mostly unused previously). This will ease the burden on new implementations, and the previous system provided no real value anyway.
Removed timeouts on endpoint read/write routines. This simplifies porting burden by providing a more orthogonal interface, and the functionality can always be replicated when desired by using an alarm combined with endpoint_shutdown. I'm fairly certain we ended up with this interface because it was convenient to do from libevent.
Things that need attention still:
- adding an fd to a pollset is O(n^2) - but this is probably ok given that we'll not
use this for multipolling once platform specific implementations are added.
- we rely on the backup poller too often - especially for SSL handshakes and for client
connection establishment we should have a better mechanism ([] []
- Linux needs to use epoll for multiple fds, FreeBSD variants (including
Darwin) need to use kqueue. ([] []
- Linux needs to use eventfd for poll kicking. ([]
Change on 2015/01/07 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83461069
58 files changed, 1806 insertions, 1417 deletions
@@ -1229,9 +1229,11 @@ LIBGRPC_SRC = \ src/core/iomgr/alarm_heap.c \ src/core/iomgr/endpoint.c \ src/core/iomgr/endpoint_pair_posix.c \ - src/core/iomgr/iomgr_libevent.c \ - src/core/iomgr/iomgr_libevent_use_threads.c \ - src/core/iomgr/pollset.c \ + src/core/iomgr/fd_posix.c \ + src/core/iomgr/iomgr.c \ + src/core/iomgr/iomgr_posix.c \ + src/core/iomgr/pollset_multipoller_with_poll_posix.c \ + src/core/iomgr/pollset_posix.c \ src/core/iomgr/resolve_address_posix.c \ src/core/iomgr/sockaddr_utils.c \ src/core/iomgr/socket_utils_common_posix.c \ @@ -1277,8 +1279,8 @@ LIBGRPC_SRC = \ src/core/transport/chttp2/stream_encoder.c \ src/core/transport/chttp2/stream_map.c \ src/core/transport/chttp2/timeout_encoding.c \ - src/core/transport/chttp2_transport.c \ src/core/transport/chttp2/varint.c \ + src/core/transport/chttp2_transport.c \ src/core/transport/metadata.c \ src/core/transport/stream_op.c \ src/core/transport/transport.c \ @@ -1379,9 +1381,11 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/alarm_heap.c \ src/core/iomgr/endpoint.c \ src/core/iomgr/endpoint_pair_posix.c \ - src/core/iomgr/iomgr_libevent.c \ - src/core/iomgr/iomgr_libevent_use_threads.c \ - src/core/iomgr/pollset.c \ + src/core/iomgr/fd_posix.c \ + src/core/iomgr/iomgr.c \ + src/core/iomgr/iomgr_posix.c \ + src/core/iomgr/pollset_multipoller_with_poll_posix.c \ + src/core/iomgr/pollset_posix.c \ src/core/iomgr/resolve_address_posix.c \ src/core/iomgr/sockaddr_utils.c \ src/core/iomgr/socket_utils_common_posix.c \ @@ -1427,8 +1431,8 @@ LIBGRPC_UNSECURE_SRC = \ src/core/transport/chttp2/stream_encoder.c \ src/core/transport/chttp2/stream_map.c \ src/core/transport/chttp2/timeout_encoding.c \ - src/core/transport/chttp2_transport.c \ src/core/transport/chttp2/varint.c \ + src/core/transport/chttp2_transport.c \ src/core/transport/metadata.c \ src/core/transport/stream_op.c \ src/core/transport/transport.c \ diff --git a/build.json b/build.json index e0bf280635..e0e05c8eaf 100644 --- a/build.json +++ b/build.json @@ -35,9 +35,11 @@ "src/core/iomgr/alarm_heap.c", "src/core/iomgr/endpoint.c", "src/core/iomgr/endpoint_pair_posix.c", - "src/core/iomgr/iomgr_libevent.c", - "src/core/iomgr/iomgr_libevent_use_threads.c", - "src/core/iomgr/pollset.c", + "src/core/iomgr/fd_posix.c", + "src/core/iomgr/iomgr.c", + "src/core/iomgr/iomgr_posix.c", + "src/core/iomgr/pollset_multipoller_with_poll_posix.c", + "src/core/iomgr/pollset_posix.c", "src/core/iomgr/resolve_address_posix.c", "src/core/iomgr/sockaddr_utils.c", "src/core/iomgr/socket_utils_common_posix.c", @@ -83,8 +85,8 @@ "src/core/transport/chttp2/stream_encoder.c", "src/core/transport/chttp2/stream_map.c", "src/core/transport/chttp2/timeout_encoding.c", - "src/core/transport/chttp2_transport.c", "src/core/transport/chttp2/varint.c", + "src/core/transport/chttp2_transport.c", "src/core/transport/metadata.c", "src/core/transport/stream_op.c", "src/core/transport/transport.c", @@ -120,10 +122,12 @@ "src/core/iomgr/alarm_internal.h", "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", - "src/core/iomgr/iomgr_completion_queue_interface.h", + "src/core/iomgr/fd_posix.h", "src/core/iomgr/iomgr.h", - "src/core/iomgr/iomgr_libevent.h", + "src/core/iomgr/iomgr_internal.h", + "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/pollset.h", + "src/core/iomgr/pollset_posix.h", "src/core/iomgr/resolve_address.h", "src/core/iomgr/sockaddr.h", "src/core/iomgr/sockaddr_posix.h", @@ -148,9 +152,9 @@ "src/core/surface/server.h", "src/core/surface/surface_trace.h", "src/core/transport/chttp2/bin_encoder.h", + "src/core/transport/chttp2/frame.h", "src/core/transport/chttp2/frame_data.h", "src/core/transport/chttp2/frame_goaway.h", - "src/core/transport/chttp2/frame.h", "src/core/transport/chttp2/frame_ping.h", "src/core/transport/chttp2/frame_rst_stream.h", "src/core/transport/chttp2/frame_settings.h", @@ -163,8 +167,8 @@ "src/core/transport/chttp2/stream_encoder.h", "src/core/transport/chttp2/stream_map.h", "src/core/transport/chttp2/timeout_encoding.h", - "src/core/transport/chttp2_transport.h", "src/core/transport/chttp2/varint.h", + "src/core/transport/chttp2_transport.h", "src/core/transport/metadata.h", "src/core/transport/stream_op.h", "src/core/transport/transport.h", diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index 9e5c9ff2ac..27a7b5529f 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -54,6 +54,7 @@ #define GPR_CPU_LINUX 1 #define GPR_GCC_SYNC 1 #define GPR_LIBEVENT 1 +#define GPR_POSIX_MULTIPOLL_WITH_POLL 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #define GPR_POSIX_SOCKETUTILS 1 @@ -65,6 +66,7 @@ #define GPR_GCC_ATOMIC 1 #define GPR_LIBEVENT 1 #define GPR_LINUX 1 +#define GPR_POSIX_MULTIPOLL_WITH_POLL 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #define GPR_POSIX_STRING 1 @@ -80,6 +82,7 @@ #define GPR_GCC_ATOMIC 1 #define GPR_LIBEVENT 1 #define GPR_POSIX_LOG 1 +#define GPR_POSIX_MULTIPOLL_WITH_POLL 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #define GPR_POSIX_SOCKETUTILS 1 diff --git a/include/grpc/support/time.h b/include/grpc/support/time.h index 5a57d94768..41d1e88dc9 100644 --- a/include/grpc/support/time.h +++ b/include/grpc/support/time.h @@ -94,6 +94,8 @@ gpr_timespec gpr_time_from_seconds(long x); gpr_timespec gpr_time_from_minutes(long x); gpr_timespec gpr_time_from_hours(long x); +gpr_int32 gpr_time_to_millis(gpr_timespec timespec); + /* Return 1 if two times are equal or within threshold of each other, 0 otherwise */ int gpr_time_similar(gpr_timespec a, gpr_timespec b, gpr_timespec threshold); diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index e67b823697..3778f4fb88 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -85,19 +85,19 @@ static void lb_channel_op(grpc_channel_element *elem, grpc_channel_op *op) { lb_channel_data *chand = elem->channel_data; grpc_channel_element *back; + int calling_back = 0; switch (op->dir) { case GRPC_CALL_UP: gpr_mu_lock(&chand->mu); back = chand->back; - if (back) chand->calling_back++; + if (back) { + chand->calling_back++; + calling_back = 1; + } gpr_mu_unlock(&chand->mu); if (back) { back->filter->channel_op(chand->back, elem, op); - gpr_mu_lock(&chand->mu); - chand->calling_back--; - gpr_cv_broadcast(&chand->cv); - gpr_mu_unlock(&chand->mu); } else if (op->type == GRPC_TRANSPORT_GOAWAY) { gpr_slice_unref(op->data.goaway.message); } @@ -107,23 +107,27 @@ static void lb_channel_op(grpc_channel_element *elem, break; } + gpr_mu_lock(&chand->mu); switch (op->type) { case GRPC_TRANSPORT_CLOSED: - gpr_mu_lock(&chand->mu); chand->disconnected = 1; maybe_destroy_channel(grpc_channel_stack_from_top_element(elem)); - gpr_mu_unlock(&chand->mu); break; case GRPC_CHANNEL_GOAWAY: - gpr_mu_lock(&chand->mu); chand->sent_goaway = 1; - gpr_mu_unlock(&chand->mu); break; case GRPC_CHANNEL_DISCONNECT: case GRPC_TRANSPORT_GOAWAY: case GRPC_ACCEPT_CALL: break; } + + if (calling_back) { + chand->calling_back--; + gpr_cv_signal(&chand->cv); + maybe_destroy_channel(grpc_channel_stack_from_top_element(elem)); + } + gpr_mu_unlock(&chand->mu); } /* Constructor for call_data */ @@ -177,7 +181,9 @@ const grpc_channel_filter grpc_child_channel_top_filter = { #define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0) -static void finally_destroy_channel(void *c, grpc_iomgr_cb_status status) { +static void finally_destroy_channel(void *c, int success) { + /* ignore success or not... this is a destruction callback and will only + happen once - the only purpose here is to release resources */ grpc_child_channel *channel = c; lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data; /* wait for the initiator to leave the mutex */ @@ -187,7 +193,7 @@ static void finally_destroy_channel(void *c, grpc_iomgr_cb_status status) { gpr_free(channel); } -static void send_farewells(void *c, grpc_iomgr_cb_status status) { +static void send_farewells(void *c, int success) { grpc_child_channel *channel = c; grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel); lb_channel_data *chand = lbelem->channel_data; @@ -221,7 +227,7 @@ static void send_farewells(void *c, grpc_iomgr_cb_status status) { static void maybe_destroy_channel(grpc_child_channel *channel) { lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data; if (chand->destroyed && chand->disconnected && chand->active_calls == 0 && - !chand->sending_farewell) { + !chand->sending_farewell && !chand->calling_back) { grpc_iomgr_add_callback(finally_destroy_channel, channel); } else if (chand->destroyed && !chand->disconnected && chand->active_calls == 0 && !chand->sending_farewell && @@ -249,14 +255,16 @@ grpc_child_channel *grpc_child_channel_create( return stk; } -void grpc_child_channel_destroy(grpc_child_channel *channel) { +void grpc_child_channel_destroy(grpc_child_channel *channel, + int wait_for_callbacks) { grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel); lb_channel_data *chand = lbelem->channel_data; gpr_mu_lock(&chand->mu); - while (chand->calling_back) { + while (wait_for_callbacks && chand->calling_back) { gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future); } + chand->back = NULL; chand->destroyed = 1; maybe_destroy_channel(channel); diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h index 9fb2a17e29..3ba4c1b8a9 100644 --- a/src/core/channel/child_channel.h +++ b/src/core/channel/child_channel.h @@ -53,7 +53,8 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel, grpc_channel_op *op); grpc_channel_element *grpc_child_channel_get_bottom_element( grpc_child_channel *channel); -void grpc_child_channel_destroy(grpc_child_channel *channel); +void grpc_child_channel_destroy(grpc_child_channel *channel, + int wait_for_callbacks); grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, grpc_call_element *parent); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index fd883a08ca..46283835a0 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -294,14 +294,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, } } -static void finally_destroy_channel(void *arg, grpc_iomgr_cb_status status) { - grpc_child_channel_destroy(arg); -} - -static void destroy_channel_later(grpc_child_channel *channel) { - grpc_iomgr_add_callback(finally_destroy_channel, channel); -} - static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { channel_data *chand = elem->channel_data; @@ -317,7 +309,7 @@ static void channel_op(grpc_channel_element *elem, gpr_mu_unlock(&chand->mu); if (child_channel) { grpc_child_channel_handle_op(child_channel, op); - destroy_channel_later(child_channel); + grpc_child_channel_destroy(child_channel, 1); } else { gpr_slice_unref(op->data.goaway.message); } @@ -329,7 +321,7 @@ static void channel_op(grpc_channel_element *elem, chand->active_child = NULL; gpr_mu_unlock(&chand->mu); if (child_channel) { - destroy_channel_later(child_channel); + grpc_child_channel_destroy(child_channel, 1); } break; case GRPC_TRANSPORT_GOAWAY: @@ -344,7 +336,7 @@ static void channel_op(grpc_channel_element *elem, } gpr_mu_unlock(&chand->mu); if (child_channel) { - destroy_channel_later(child_channel); + grpc_child_channel_destroy(child_channel, 0); } gpr_slice_unref(op->data.goaway.message); break; @@ -360,7 +352,7 @@ static void channel_op(grpc_channel_element *elem, } gpr_mu_unlock(&chand->mu); if (child_channel) { - destroy_channel_later(child_channel); + grpc_child_channel_destroy(child_channel, 0); } break; default: @@ -445,7 +437,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_transport_setup_cancel(chand->transport_setup); if (chand->active_child) { - grpc_child_channel_destroy(chand->active_child); + grpc_child_channel_destroy(chand->active_child, 1); chand->active_child = NULL; } @@ -549,7 +541,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( gpr_free(child_filters); if (old_active) { - grpc_child_channel_destroy(old_active); + grpc_child_channel_destroy(old_active, 1); } return result; diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index b1194e278d..ebaf816902 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -166,8 +166,7 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) { return result; } -static void backoff_alarm_done(void *arg /* grpc_client_setup */, - grpc_iomgr_cb_status status) { +static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) { grpc_client_setup *s = arg; grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request)); r->setup = s; @@ -177,7 +176,7 @@ static void backoff_alarm_done(void *arg /* grpc_client_setup */, gpr_mu_lock(&s->mu); s->active_request = r; s->in_alarm = 0; - if (status != GRPC_CALLBACK_SUCCESS) { + if (!success) { if (0 == --s->refs) { gpr_mu_unlock(&s->mu); destroy_setup(s); diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 06d73e40f5..2143eeb63d 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -101,12 +101,11 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, switch (status) { case GRPC_ENDPOINT_CB_OK: - grpc_endpoint_notify_on_read(req->ep, on_read, req, gpr_inf_future); + grpc_endpoint_notify_on_read(req->ep, on_read, req); break; case GRPC_ENDPOINT_CB_EOF: case GRPC_ENDPOINT_CB_ERROR: case GRPC_ENDPOINT_CB_SHUTDOWN: - case GRPC_ENDPOINT_CB_TIMED_OUT: if (!req->have_read_byte) { next_address(req); } else { @@ -123,7 +122,7 @@ done: static void on_written(internal_request *req) { gpr_log(GPR_DEBUG, "%s", __FUNCTION__); - grpc_endpoint_notify_on_read(req->ep, on_read, req, gpr_inf_future); + grpc_endpoint_notify_on_read(req->ep, on_read, req); } static void done_write(void *arg, grpc_endpoint_cb_status status) { @@ -136,7 +135,6 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) { case GRPC_ENDPOINT_CB_EOF: case GRPC_ENDPOINT_CB_SHUTDOWN: case GRPC_ENDPOINT_CB_ERROR: - case GRPC_ENDPOINT_CB_TIMED_OUT: next_address(req); break; } @@ -145,8 +143,8 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) { static void start_write(internal_request *req) { gpr_slice_ref(req->request_text); gpr_log(GPR_DEBUG, "%s", __FUNCTION__); - switch (grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req, - gpr_inf_future)) { + switch ( + grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) { case GRPC_ENDPOINT_WRITE_DONE: on_written(req); break; diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index b7238f716a..2664879323 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -71,8 +71,8 @@ static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static int run_some_expired_alarms(gpr_timespec now, - grpc_iomgr_cb_status status); +static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, + gpr_timespec *next, int success); static gpr_timespec compute_min_deadline(shard_type *shard) { return grpc_alarm_heap_is_empty(&shard->heap) @@ -102,7 +102,7 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown() { int i; - while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED)) + while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0)) ; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -233,7 +233,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { gpr_mu_unlock(&shard->mu); if (triggered) { - alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED); + alarm->cb(alarm->cb_arg, 0); } } @@ -299,8 +299,8 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now, return n; } -static int run_some_expired_alarms(gpr_timespec now, - grpc_iomgr_cb_status status) { +static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, + gpr_timespec *next, int success) { size_t n = 0; size_t i; grpc_alarm *alarms[MAX_ALARMS_PER_CHECK]; @@ -329,19 +329,35 @@ static int run_some_expired_alarms(gpr_timespec now, note_deadline_change(g_shard_queue[0]); } + if (next) { + *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + } + gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_checker_mu); + } else if (next) { + gpr_mu_lock(&g_mu); + *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + gpr_mu_unlock(&g_mu); + } + + if (n && drop_mu) { + gpr_mu_unlock(drop_mu); } for (i = 0; i < n; i++) { - alarms[i]->cb(alarms[i]->cb_arg, status); + alarms[i]->cb(alarms[i]->cb_arg, success); + } + + if (n && drop_mu) { + gpr_mu_lock(drop_mu); } return n; } -int grpc_alarm_check(gpr_timespec now) { - return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS); +int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { + return run_some_expired_alarms(drop_mu, now, next, 1); } gpr_timespec grpc_alarm_list_next_timeout() { diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h index e605ff84f9..12b6ab4286 100644 --- a/src/core/iomgr/alarm_internal.h +++ b/src/core/iomgr/alarm_internal.h @@ -34,9 +34,12 @@ #ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ #define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + /* iomgr internal api for dealing with alarms */ -int grpc_alarm_check(gpr_timespec now); +int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next); void grpc_alarm_list_init(gpr_timespec now); void grpc_alarm_list_shutdown(); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index f1944bf672..9e5d56389d 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -34,14 +34,16 @@ #include "src/core/iomgr/endpoint.h" void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline) { - ep->vtable->notify_on_read(ep, cb, user_data, deadline); + void *user_data) { + ep->vtable->notify_on_read(ep, cb, user_data); } -grpc_endpoint_write_status grpc_endpoint_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { - return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline); +grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data) { + return ep->vtable->write(ep, slices, nslices, cb, user_data); } void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index bbd800bea8..ec86d9a146 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -48,8 +48,7 @@ typedef enum grpc_endpoint_cb_status { GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */ GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */ GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */ - GRPC_ENDPOINT_CB_ERROR, /* Call interrupted by socket error */ - GRPC_ENDPOINT_CB_TIMED_OUT /* Call timed out */ + GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */ } grpc_endpoint_cb_status; typedef enum grpc_endpoint_write_status { @@ -66,10 +65,10 @@ typedef void (*grpc_endpoint_write_cb)(void *user_data, struct grpc_endpoint_vtable { void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_endpoint_write_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); @@ -77,7 +76,7 @@ struct grpc_endpoint_vtable { /* When data is available on the connection, calls the callback with slices. */ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); /* Write slices out to the socket. @@ -85,9 +84,11 @@ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, returns GRPC_ENDPOINT_WRITE_DONE. Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the connection is ready for more data. */ -grpc_endpoint_write_status grpc_endpoint_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline); +grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data); /* Causes any pending read/write callbacks to run immediately with GRPC_ENDPOINT_CB_SHUTDOWN status */ diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c new file mode 100644 index 0000000000..3cd2f9a8e0 --- /dev/null +++ b/src/core/iomgr/fd_posix.c @@ -0,0 +1,274 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/fd_posix.h" + +#include <assert.h> +#include <unistd.h> + +#include "src/core/iomgr/iomgr_internal.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +enum descriptor_state { NOT_READY, READY, WAITING }; + +static void destroy(grpc_fd *fd) { + grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); + gpr_mu_destroy(&fd->set_state_mu); + gpr_free(fd->watchers); + gpr_free(fd); + grpc_iomgr_unref(); +} + +static void ref_by(grpc_fd *fd, int n) { + gpr_atm_no_barrier_fetch_add(&fd->refst, n); +} + +static void unref_by(grpc_fd *fd, int n) { + if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) { + destroy(fd); + } +} + +static void do_nothing(void *ignored, int success) {} + +grpc_fd *grpc_fd_create(int fd) { + grpc_fd *r = gpr_malloc(sizeof(grpc_fd)); + grpc_iomgr_ref(); + gpr_atm_rel_store(&r->refst, 1); + gpr_atm_rel_store(&r->readst.state, NOT_READY); + gpr_atm_rel_store(&r->writest.state, NOT_READY); + gpr_mu_init(&r->set_state_mu); + gpr_mu_init(&r->watcher_mu); + gpr_atm_rel_store(&r->shutdown, 0); + r->fd = fd; + r->watchers = NULL; + r->watcher_count = 0; + r->watcher_capacity = 0; + grpc_pollset_add_fd(grpc_backup_pollset(), r); + return r; +} + +int grpc_fd_is_orphaned(grpc_fd *fd) { + return (gpr_atm_acq_load(&fd->refst) & 1) == 0; +} + +static void wake_watchers(grpc_fd *fd) { + size_t i, n; + gpr_mu_lock(&fd->watcher_mu); + n = fd->watcher_count; + for (i = 0; i < n; i++) { + grpc_pollset_force_kick(fd->watchers[i]); + } + gpr_mu_unlock(&fd->watcher_mu); +} + +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { + fd->on_done = on_done ? on_done : do_nothing; + fd->on_done_user_data = user_data; + ref_by(fd, 1); /* remove active status, but keep referenced */ + wake_watchers(fd); + close(fd->fd); + unref_by(fd, 2); /* drop the reference */ +} + +/* increment refcount by two to avoid changing the orphan bit */ +void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } + +void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } + +typedef struct { + grpc_iomgr_cb_func cb; + void *arg; +} callback; + +static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, + int allow_synchronous_callback) { + if (allow_synchronous_callback) { + cb(arg, success); + } else { + grpc_iomgr_add_delayed_callback(cb, arg, success); + } +} + +static void make_callbacks(callback *callbacks, size_t n, int success, + int allow_synchronous_callback) { + size_t i; + for (i = 0; i < n; i++) { + make_callback(callbacks[i].cb, callbacks[i].arg, success, + allow_synchronous_callback); + } +} + +static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, + void *arg, int allow_synchronous_callback) { + switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { + case NOT_READY: + /* There is no race if the descriptor is already ready, so we skip + the interlocked op in that case. As long as the app doesn't + try to set the same upcall twice (which it shouldn't) then + oldval should never be anything other than READY or NOT_READY. We + don't + check for user error on the fast path. */ + st->cb = cb; + st->cb_arg = arg; + if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) { + /* swap was successful -- the closure will run after the next + set_ready call. NOTE: we don't have an ABA problem here, + since we should never have concurrent calls to the same + notify_on function. */ + wake_watchers(fd); + return; + } + /* swap was unsuccessful due to an intervening set_ready call. + Fall through to the READY code below */ + case READY: + assert(gpr_atm_acq_load(&st->state) == READY); + gpr_atm_rel_store(&st->state, NOT_READY); + make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown), + allow_synchronous_callback); + return; + case WAITING: + /* upcallptr was set to a different closure. This is an error! */ + gpr_log(GPR_ERROR, + "User called a notify_on function with a previous callback still " + "pending"); + abort(); + } + gpr_log(GPR_ERROR, "Corrupt memory in &st->state"); + abort(); +} + +static void set_ready_locked(grpc_fd_state *st, callback *callbacks, + size_t *ncallbacks) { + callback *c; + + switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { + case NOT_READY: + if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) { + /* swap was successful -- the closure will run after the next + notify_on call. */ + return; + } + /* swap was unsuccessful due to an intervening set_ready call. + Fall through to the WAITING code below */ + case WAITING: + assert(gpr_atm_acq_load(&st->state) == WAITING); + c = &callbacks[(*ncallbacks)++]; + c->cb = st->cb; + c->arg = st->cb_arg; + gpr_atm_rel_store(&st->state, NOT_READY); + return; + case READY: + /* duplicate ready, ignore */ + return; + } +} + +static void set_ready(grpc_fd *fd, grpc_fd_state *st, + int allow_synchronous_callback) { + /* only one set_ready can be active at once (but there may be a racing + notify_on) */ + int success; + callback cb; + size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); + set_ready_locked(st, &cb, &ncb); + gpr_mu_unlock(&fd->set_state_mu); + success = !gpr_atm_acq_load(&fd->shutdown); + make_callbacks(&cb, ncb, success, allow_synchronous_callback); +} + +void grpc_fd_shutdown(grpc_fd *fd) { + callback cb[2]; + size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); + GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown)); + gpr_atm_rel_store(&fd->shutdown, 1); + set_ready_locked(&fd->readst, cb, &ncb); + set_ready_locked(&fd->writest, cb, &ncb); + gpr_mu_unlock(&fd->set_state_mu); + make_callbacks(cb, ncb, 0, 0); +} + +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg) { + notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0); +} + +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg) { + notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0); +} + +gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + gpr_uint32 read_mask, gpr_uint32 write_mask) { + /* keep track of pollers that have requested our events, in case they change + */ + gpr_mu_lock(&fd->watcher_mu); + if (fd->watcher_capacity == fd->watcher_count) { + fd->watcher_capacity = + GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2); + fd->watchers = gpr_realloc(fd->watchers, + fd->watcher_capacity * sizeof(grpc_pollset *)); + } + fd->watchers[fd->watcher_count++] = pollset; + gpr_mu_unlock(&fd->watcher_mu); + + return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) | + (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0); +} + +void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) { + size_t r, w, n; + + gpr_mu_lock(&fd->watcher_mu); + n = fd->watcher_count; + for (r = 0, w = 0; r < n; r++) { + if (fd->watchers[r] == pollset) { + fd->watcher_count--; + continue; + } + fd->watchers[w++] = fd->watchers[r]; + } + gpr_mu_unlock(&fd->watcher_mu); +} + +void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { + set_ready(fd, &fd->readst, allow_synchronous_callback); +} + +void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) { + set_ready(fd, &fd->writest, allow_synchronous_callback); +} diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h new file mode 100644 index 0000000000..232de0c3e0 --- /dev/null +++ b/src/core/iomgr/fd_posix.h @@ -0,0 +1,138 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ + +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +typedef struct { + grpc_iomgr_cb_func cb; + void *cb_arg; + int success; + gpr_atm state; +} grpc_fd_state; + +typedef struct grpc_fd { + int fd; + /* refst format: + bit0: 1=active/0=orphaned + bit1-n: refcount + meaning that mostly we ref by two to avoid altering the orphaned bit, + and just unref by 1 when we're ready to flag the object as orphaned */ + gpr_atm refst; + + gpr_mu set_state_mu; + gpr_atm shutdown; + + gpr_mu watcher_mu; + grpc_pollset **watchers; + size_t watcher_count; + size_t watcher_capacity; + + grpc_fd_state readst; + grpc_fd_state writest; + + grpc_iomgr_cb_func on_done; + void *on_done_user_data; +} grpc_fd; + +/* Create a wrapped file descriptor. + Requires fd is a non-blocking file descriptor. + This takes ownership of closing fd. */ +grpc_fd *grpc_fd_create(int fd); + +/* Releases fd to be asynchronously destroyed. + on_done is called when the underlying file descriptor is definitely close()d. + If on_done is NULL, no callback will be made. + Requires: *fd initialized; no outstanding notify_on_read or + notify_on_write. */ +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data); + +/* Begin polling on an fd. + Registers that the given pollset is interested in this fd - so that if read + or writability interest changes, the pollset can be kicked to pick up that + new interest. + Return value is: + (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0) + i.e. a combination of read_mask and write_mask determined by the fd's current + interest in said events. + Polling strategies that do not need to alter their behavior depending on the + fd's current interest (such as epoll) do not need to call this function. */ +gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + gpr_uint32 read_mask, gpr_uint32 write_mask); +/* Complete polling previously started with grpc_fd_begin_poll */ +void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset); + +/* Return 1 if this fd is orphaned, 0 otherwise */ +int grpc_fd_is_orphaned(grpc_fd *fd); + +/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ +void grpc_fd_shutdown(grpc_fd *fd); + +/* Register read interest, causing read_cb to be called once when fd becomes + readable, on deadline specified by deadline, or on shutdown triggered by + grpc_fd_shutdown. + read_cb will be called with read_cb_arg when *fd becomes readable. + read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, + GRPC_CALLBACK_TIMED_OUT if the call timed out, + and CANCELLED if the call was cancelled. + + Requires:This method must not be called before the read_cb for any previous + call runs. Edge triggered events are used whenever they are supported by the + underlying platform. This means that users must drain fd in read_cb before + calling notify_on_read again. Users are also expected to handle spurious + events, i.e read_cb is called while nothing can be readable from fd */ +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg); + +/* Exactly the same semantics as above, except based on writable events. */ +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg); + +/* Notification from the poller to an fd that it has become readable or + writable. + If allow_synchronous_callback is 1, allow running the fd callback inline + in this callstack, otherwise register an asynchronous callback and return */ +void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback); +void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback); + +/* Reference counting for fds */ +void grpc_fd_ref(grpc_fd *fd); +void grpc_fd_unref(grpc_fd *fd); + +#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c new file mode 100644 index 0000000000..03f56a50a3 --- /dev/null +++ b/src/core/iomgr/iomgr.c @@ -0,0 +1,204 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/iomgr.h" + +#include <stdlib.h> + +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/alarm_internal.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/sync.h> + +typedef struct delayed_callback { + grpc_iomgr_cb_func cb; + void *cb_arg; + int success; + struct delayed_callback *next; +} delayed_callback; + +static gpr_mu g_mu; +static gpr_cv g_cv; +static delayed_callback *g_cbs_head = NULL; +static delayed_callback *g_cbs_tail = NULL; +static int g_shutdown; +static int g_refs; +static gpr_event g_background_callback_executor_done; + +/* Execute followup callbacks continuously. + Other threads may check in and help during pollset_work() */ +static void background_callback_executor(void *ignored) { + gpr_mu_lock(&g_mu); + while (!g_shutdown) { + gpr_timespec deadline = gpr_inf_future; + if (g_cbs_head) { + delayed_callback *cb = g_cbs_head; + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + cb->cb(cb->cb_arg, cb->success); + gpr_free(cb); + gpr_mu_lock(&g_mu); + } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { + } else { + gpr_cv_wait(&g_cv, &g_mu, deadline); + } + } + gpr_mu_unlock(&g_mu); + gpr_event_set(&g_background_callback_executor_done, (void *)1); +} + +void grpc_kick_poller() { gpr_cv_broadcast(&g_cv); } + +void grpc_iomgr_init() { + gpr_thd_id id; + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); + grpc_alarm_list_init(gpr_now()); + g_refs = 0; + grpc_iomgr_platform_init(); + gpr_event_init(&g_background_callback_executor_done); + gpr_thd_new(&id, background_callback_executor, NULL, NULL); +} + +void grpc_iomgr_shutdown() { + delayed_callback *cb; + gpr_timespec shutdown_deadline = + gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); + + grpc_iomgr_platform_shutdown(); + + gpr_mu_lock(&g_mu); + g_shutdown = 1; + while (g_cbs_head || g_refs) { + gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, + g_cbs_head ? " and executing final callbacks" : ""); + while (g_cbs_head) { + cb = g_cbs_head; + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + + cb->cb(cb->cb_arg, 0); + gpr_free(cb); + gpr_mu_lock(&g_mu); + } + if (g_refs) { + if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) { + gpr_log(GPR_DEBUG, + "Failed to free %d iomgr objects before shutdown deadline: " + "memory leaks are likely", + g_refs); + break; + } + } + } + gpr_mu_unlock(&g_mu); + + gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); + + grpc_alarm_list_shutdown(); + gpr_mu_destroy(&g_mu); + gpr_cv_destroy(&g_cv); +} + +void grpc_iomgr_ref() { + gpr_mu_lock(&g_mu); + ++g_refs; + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_unref() { + gpr_mu_lock(&g_mu); + if (0 == --g_refs) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, + int success) { + delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback)); + dcb->cb = cb; + dcb->cb_arg = cb_arg; + dcb->success = success; + gpr_mu_lock(&g_mu); + dcb->next = NULL; + if (!g_cbs_tail) { + g_cbs_head = g_cbs_tail = dcb; + } else { + g_cbs_tail->next = dcb; + g_cbs_tail = dcb; + } + gpr_cv_signal(&g_cv); + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { + grpc_iomgr_add_delayed_callback(cb, cb_arg, 1); +} + +int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { + int n = 0; + gpr_mu *retake_mu = NULL; + delayed_callback *cb; + for (;;) { + /* check for new work */ + if (!gpr_mu_trylock(&g_mu)) { + break; + } + cb = g_cbs_head; + if (!cb) { + gpr_mu_unlock(&g_mu); + break; + } + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + /* if we have a mutex to drop, do so before executing work */ + if (drop_mu) { + gpr_mu_unlock(drop_mu); + retake_mu = drop_mu; + drop_mu = NULL; + } + cb->cb(cb->cb_arg, success && cb->success); + gpr_free(cb); + n++; + } + if (retake_mu) { + gpr_mu_lock(retake_mu); + } + return n; +} diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index cf39f947bc..16991a9b90 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -34,17 +34,8 @@ #ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__ #define __GRPC_INTERNAL_IOMGR_IOMGR_H__ -/* Status passed to callbacks for grpc_em_fd_notify_on_read and - grpc_em_fd_notify_on_write. */ -typedef enum grpc_em_cb_status { - GRPC_CALLBACK_SUCCESS = 0, - GRPC_CALLBACK_TIMED_OUT, - GRPC_CALLBACK_CANCELLED, - GRPC_CALLBACK_DO_NOT_USE -} grpc_iomgr_cb_status; - /* gRPC Callback definition */ -typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status); +typedef void (*grpc_iomgr_cb_func)(void *arg, int success); void grpc_iomgr_init(); void grpc_iomgr_shutdown(); diff --git a/src/core/iomgr/iomgr_libevent_use_threads.c b/src/core/iomgr/iomgr_internal.h index af449342f0..5f72542777 100644 --- a/src/core/iomgr/iomgr_libevent_use_threads.c +++ b/src/core/iomgr/iomgr_internal.h @@ -31,26 +31,21 @@ * */ -/* Posix grpc event manager support code. */ -#include <grpc/support/log.h> +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ + +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/iomgr_internal.h" #include <grpc/support/sync.h> -#include <event2/thread.h> -static int error_code = 0; -static gpr_once threads_once = GPR_ONCE_INIT; -static void evthread_threads_initialize(void) { - error_code = evthread_use_pthreads(); - if (error_code) { - gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); - } -} +int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); +void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, + int success); + +void grpc_iomgr_ref(); +void grpc_iomgr_unref(); + +void grpc_iomgr_platform_init(); +void grpc_iomgr_platform_shutdown(); -/* Notify LibEvent that Posix pthread is used. */ -int evthread_use_threads() { - gpr_once_init(&threads_once, &evthread_threads_initialize); - /* For Pthreads or Windows threads, Libevent provides simple APIs to set - mutexes and conditional variables to support cross thread operations. - For other platforms, LibEvent provide callback APIs to hook mutexes and - conditional variables. */ - return error_code; -} +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ */ diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c deleted file mode 100644 index 6188ab2749..0000000000 --- a/src/core/iomgr/iomgr_libevent.c +++ /dev/null @@ -1,652 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/iomgr/iomgr_libevent.h" - -#include <unistd.h> -#include <fcntl.h> - -#include "src/core/iomgr/alarm.h" -#include "src/core/iomgr/alarm_internal.h" -#include <grpc/support/atm.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd.h> -#include <grpc/support/time.h> -#include <event2/event.h> -#include <event2/thread.h> - -#define ALARM_TRIGGER_INIT ((gpr_atm)0) -#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1) -#define DONE_SHUTDOWN ((void *)1) - -#define POLLER_ID_INVALID ((gpr_atm)-1) - -/* Global data */ -struct event_base *g_event_base; -gpr_mu grpc_iomgr_mu; -gpr_cv grpc_iomgr_cv; -static grpc_libevent_activation_data *g_activation_queue; -static int g_num_pollers; -static int g_num_fds; -static int g_num_address_resolutions; -static gpr_timespec g_last_poll_completed; -static int g_shutdown_backup_poller; -static gpr_event g_backup_poller_done; -/* activated to break out of the event loop early */ -static struct event *g_timeout_ev; -/* activated to safely break polling from other threads */ -static struct event *g_break_ev; -static grpc_fd *g_fds_to_free; - -int evthread_use_threads(void); -static void grpc_fd_impl_destroy(grpc_fd *impl); - -void grpc_iomgr_ref_address_resolution(int delta) { - gpr_mu_lock(&grpc_iomgr_mu); - GPR_ASSERT(!g_shutdown_backup_poller); - g_num_address_resolutions += delta; - if (0 == g_num_address_resolutions) { - gpr_cv_broadcast(&grpc_iomgr_cv); - } - gpr_mu_unlock(&grpc_iomgr_mu); -} - -/* If anything is in the work queue, process one item and return 1. - Return 0 if there were no work items to complete. - Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ -static int maybe_do_queue_work() { - grpc_libevent_activation_data *work = g_activation_queue; - - if (work == NULL) return 0; - - if (work->next == work) { - g_activation_queue = NULL; - } else { - g_activation_queue = work->next; - g_activation_queue->prev = work->prev; - g_activation_queue->next->prev = g_activation_queue->prev->next = - g_activation_queue; - } - work->next = work->prev = NULL; - /* force status to cancelled from ok when shutting down */ - if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) { - work->status = GRPC_CALLBACK_CANCELLED; - } - gpr_mu_unlock(&grpc_iomgr_mu); - - work->cb(work->arg, work->status); - - gpr_mu_lock(&grpc_iomgr_mu); - return 1; -} - -/* Break out of the event loop on timeout */ -static void timer_callback(int fd, short events, void *context) { - event_base_loopbreak((struct event_base *)context); -} - -static void break_callback(int fd, short events, void *context) { - event_base_loopbreak((struct event_base *)context); -} - -static void free_fd_list(grpc_fd *impl) { - while (impl != NULL) { - grpc_fd *current = impl; - impl = impl->next; - grpc_fd_impl_destroy(current); - current->on_done(current->on_done_user_data, GRPC_CALLBACK_SUCCESS); - gpr_free(current); - } -} - -static void maybe_free_fds() { - if (g_fds_to_free) { - free_fd_list(g_fds_to_free); - g_fds_to_free = NULL; - } -} - -void grpc_kick_poller() { event_active(g_break_ev, EV_READ, 0); } - -/* Spend some time doing polling and libevent maintenance work if no other - thread is. This includes both polling for events and destroying/closing file - descriptor objects. - Returns 1 if polling was performed, 0 otherwise. - Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ -static int maybe_do_polling_work(struct timeval delay) { - int status; - - if (g_num_pollers) return 0; - - g_num_pollers = 1; - - maybe_free_fds(); - - gpr_mu_unlock(&grpc_iomgr_mu); - - event_add(g_timeout_ev, &delay); - status = event_base_loop(g_event_base, EVLOOP_ONCE); - if (status < 0) { - gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status); - } - event_del(g_timeout_ev); - - gpr_mu_lock(&grpc_iomgr_mu); - maybe_free_fds(); - - g_num_pollers = 0; - gpr_cv_broadcast(&grpc_iomgr_cv); - return 1; -} - -static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) { - int r = 0; - if (gpr_time_cmp(next, now) < 0) { - gpr_mu_unlock(&grpc_iomgr_mu); - r = grpc_alarm_check(now); - gpr_mu_lock(&grpc_iomgr_mu); - } - return r; -} - -int grpc_iomgr_work(gpr_timespec deadline) { - gpr_timespec now = gpr_now(); - gpr_timespec next = grpc_alarm_list_next_timeout(); - gpr_timespec delay_timespec = gpr_time_sub(deadline, now); - /* poll for no longer than one second */ - gpr_timespec max_delay = gpr_time_from_seconds(1); - struct timeval delay; - - if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) { - return 0; - } - - if (gpr_time_cmp(delay_timespec, max_delay) > 0) { - delay_timespec = max_delay; - } - - /* Adjust delay to account for the next alarm, if applicable. */ - delay_timespec = gpr_time_min( - delay_timespec, gpr_time_sub(grpc_alarm_list_next_timeout(), now)); - - delay = gpr_timeval_from_timespec(delay_timespec); - - if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) || - maybe_do_polling_work(delay)) { - g_last_poll_completed = gpr_now(); - return 1; - } - - return 0; -} - -static void backup_poller_thread(void *p) { - int backup_poller_engaged = 0; - /* allow no pollers for 100 milliseconds, then engage backup polling */ - gpr_timespec allow_no_pollers = gpr_time_from_millis(100); - - gpr_mu_lock(&grpc_iomgr_mu); - while (!g_shutdown_backup_poller) { - if (g_num_pollers == 0) { - gpr_timespec now = gpr_now(); - gpr_timespec time_until_engage = gpr_time_sub( - allow_no_pollers, gpr_time_sub(now, g_last_poll_completed)); - if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) { - if (!backup_poller_engaged) { - gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller"); - backup_poller_engaged = 1; - } - if (!maybe_do_queue_work()) { - gpr_timespec next = grpc_alarm_list_next_timeout(); - if (!maybe_do_alarm_work(now, next)) { - gpr_timespec deadline = - gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1))); - maybe_do_polling_work( - gpr_timeval_from_timespec(gpr_time_sub(deadline, now))); - } - } - } else { - if (backup_poller_engaged) { - gpr_log(GPR_DEBUG, "Backup poller disengaged"); - backup_poller_engaged = 0; - } - gpr_mu_unlock(&grpc_iomgr_mu); - gpr_sleep_until(gpr_time_add(now, time_until_engage)); - gpr_mu_lock(&grpc_iomgr_mu); - } - } else { - if (backup_poller_engaged) { - gpr_log(GPR_DEBUG, "Backup poller disengaged"); - backup_poller_engaged = 0; - } - gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future); - } - } - gpr_mu_unlock(&grpc_iomgr_mu); - - gpr_event_set(&g_backup_poller_done, (void *)1); -} - -void grpc_iomgr_init() { - gpr_thd_id backup_poller_id; - - if (evthread_use_threads() != 0) { - gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); - abort(); - } - - grpc_alarm_list_init(gpr_now()); - - gpr_mu_init(&grpc_iomgr_mu); - gpr_cv_init(&grpc_iomgr_cv); - g_activation_queue = NULL; - g_num_pollers = 0; - g_num_fds = 0; - g_num_address_resolutions = 0; - g_last_poll_completed = gpr_now(); - g_shutdown_backup_poller = 0; - g_fds_to_free = NULL; - - gpr_event_init(&g_backup_poller_done); - - g_event_base = NULL; - g_timeout_ev = NULL; - g_break_ev = NULL; - - g_event_base = event_base_new(); - if (!g_event_base) { - gpr_log(GPR_ERROR, "Failed to create the event base"); - abort(); - } - - if (evthread_make_base_notifiable(g_event_base) != 0) { - gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!"); - abort(); - } - - g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base); - g_break_ev = event_new(g_event_base, -1, EV_READ | EV_PERSIST, break_callback, - g_event_base); - - event_add(g_break_ev, NULL); - - gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL); -} - -void grpc_iomgr_shutdown() { - gpr_timespec fd_shutdown_deadline = - gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); - - /* broadcast shutdown */ - gpr_mu_lock(&grpc_iomgr_mu); - while (g_num_fds > 0 || g_num_address_resolutions > 0) { - gpr_log(GPR_INFO, - "waiting for %d fds and %d name resolutions to be destroyed before " - "closing event manager", - g_num_fds, g_num_address_resolutions); - if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) { - gpr_log(GPR_ERROR, - "not all fds or name resolutions destroyed before shutdown " - "deadline: memory leaks " - "are likely"); - break; - } else if (g_num_fds == 0 && g_num_address_resolutions == 0) { - gpr_log(GPR_INFO, "all fds closed, all name resolutions finished"); - } - } - - g_shutdown_backup_poller = 1; - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); - - gpr_event_wait(&g_backup_poller_done, gpr_inf_future); - - grpc_alarm_list_shutdown(); - - /* drain pending work */ - gpr_mu_lock(&grpc_iomgr_mu); - while (maybe_do_queue_work()) - ; - gpr_mu_unlock(&grpc_iomgr_mu); - - free_fd_list(g_fds_to_free); - - /* complete shutdown */ - gpr_mu_destroy(&grpc_iomgr_mu); - gpr_cv_destroy(&grpc_iomgr_cv); - - if (g_timeout_ev != NULL) { - event_free(g_timeout_ev); - } - - if (g_break_ev != NULL) { - event_free(g_break_ev); - } - - if (g_event_base != NULL) { - event_base_free(g_event_base); - g_event_base = NULL; - } -} - -static void add_task(grpc_libevent_activation_data *adata) { - gpr_mu_lock(&grpc_iomgr_mu); - if (g_activation_queue) { - adata->next = g_activation_queue; - adata->prev = adata->next->prev; - adata->next->prev = adata->prev->next = adata; - } else { - g_activation_queue = adata; - adata->next = adata->prev = adata; - } - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); -} - -static void grpc_fd_impl_destroy(grpc_fd *impl) { - grpc_em_task_activity_type type; - grpc_libevent_activation_data *adata; - - for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { - adata = &(impl->task.activation[type]); - GPR_ASSERT(adata->next == NULL); - if (adata->ev != NULL) { - event_free(adata->ev); - adata->ev = NULL; - } - } - - if (impl->shutdown_ev != NULL) { - event_free(impl->shutdown_ev); - impl->shutdown_ev = NULL; - } - gpr_mu_destroy(&impl->mu); - close(impl->fd); -} - -/* Proxy callback to call a gRPC read/write callback */ -static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) { - grpc_fd *em_fd = arg; - grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS; - int run_read_cb = 0; - int run_write_cb = 0; - grpc_libevent_activation_data *rdata, *wdata; - - gpr_mu_lock(&em_fd->mu); - if (em_fd->shutdown_started) { - status = GRPC_CALLBACK_CANCELLED; - } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) { - status = GRPC_CALLBACK_TIMED_OUT; - /* TODO(klempner): This is broken if we are monitoring both read and write - events on the same fd -- generating a spurious event is okay, but - generating a spurious timeout is not. */ - what |= (EV_READ | EV_WRITE); - } - - if (what & EV_READ) { - switch (em_fd->read_state) { - case GRPC_FD_WAITING: - run_read_cb = 1; - em_fd->read_state = GRPC_FD_IDLE; - break; - case GRPC_FD_IDLE: - case GRPC_FD_CACHED: - em_fd->read_state = GRPC_FD_CACHED; - } - } - if (what & EV_WRITE) { - switch (em_fd->write_state) { - case GRPC_FD_WAITING: - run_write_cb = 1; - em_fd->write_state = GRPC_FD_IDLE; - break; - case GRPC_FD_IDLE: - case GRPC_FD_CACHED: - em_fd->write_state = GRPC_FD_CACHED; - } - } - - if (run_read_cb) { - rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); - rdata->status = status; - add_task(rdata); - } else if (run_write_cb) { - wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); - wdata->status = status; - add_task(wdata); - } - gpr_mu_unlock(&em_fd->mu); -} - -static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) { - /* TODO(klempner): This could just run directly in the calling thread, except - that libevent's handling of event_active() on an event which is already in - flight on a different thread is racy and easily triggers TSAN. - */ - grpc_fd *impl = arg; - gpr_mu_lock(&impl->mu); - impl->shutdown_started = 1; - if (impl->read_state == GRPC_FD_WAITING) { - event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); - } - if (impl->write_state == GRPC_FD_WAITING) { - event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); - } - gpr_mu_unlock(&impl->mu); -} - -grpc_fd *grpc_fd_create(int fd) { - int flags; - grpc_libevent_activation_data *rdata, *wdata; - grpc_fd *impl = gpr_malloc(sizeof(grpc_fd)); - - gpr_mu_lock(&grpc_iomgr_mu); - g_num_fds++; - gpr_mu_unlock(&grpc_iomgr_mu); - - impl->shutdown_ev = NULL; - gpr_mu_init(&impl->mu); - - flags = fcntl(fd, F_GETFL, 0); - GPR_ASSERT((flags & O_NONBLOCK) != 0); - - impl->task.type = GRPC_EM_TASK_FD; - impl->fd = fd; - - rdata = &(impl->task.activation[GRPC_EM_TA_READ]); - rdata->ev = NULL; - rdata->cb = NULL; - rdata->arg = NULL; - rdata->status = GRPC_CALLBACK_SUCCESS; - rdata->prev = NULL; - rdata->next = NULL; - - wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]); - wdata->ev = NULL; - wdata->cb = NULL; - wdata->arg = NULL; - wdata->status = GRPC_CALLBACK_SUCCESS; - wdata->prev = NULL; - wdata->next = NULL; - - impl->read_state = GRPC_FD_IDLE; - impl->write_state = GRPC_FD_IDLE; - - impl->shutdown_started = 0; - impl->next = NULL; - - /* TODO(chenw): detect platforms where only level trigger is supported, - and set the event to non-persist. */ - rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ, - em_fd_cb, impl); - GPR_ASSERT(rdata->ev); - - wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE, - em_fd_cb, impl); - GPR_ASSERT(wdata->ev); - - impl->shutdown_ev = - event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl); - GPR_ASSERT(impl->shutdown_ev); - - return impl; -} - -static void do_nothing(void *ignored, grpc_iomgr_cb_status also_ignored) {} - -void grpc_fd_destroy(grpc_fd *impl, grpc_iomgr_cb_func on_done, - void *user_data) { - if (on_done == NULL) on_done = do_nothing; - - gpr_mu_lock(&grpc_iomgr_mu); - - /* Put the impl on the list to be destroyed by the poller. */ - impl->on_done = on_done; - impl->on_done_user_data = user_data; - impl->next = g_fds_to_free; - g_fds_to_free = impl; - /* TODO(ctiller): kick the poller so it destroys this fd promptly - (currently we may wait up to a second) */ - - g_num_fds--; - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); -} - -int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; } - -/* TODO(chenw): should we enforce the contract that notify_on_read cannot be - called when the previously registered callback has not been called yet. */ -int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb, - void *read_cb_arg, gpr_timespec deadline) { - int force_event = 0; - grpc_libevent_activation_data *rdata; - gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); - struct timeval delay = gpr_timeval_from_timespec(delay_timespec); - struct timeval *delayp = - gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; - - rdata = &impl->task.activation[GRPC_EM_TA_READ]; - - gpr_mu_lock(&impl->mu); - rdata->cb = read_cb; - rdata->arg = read_cb_arg; - - force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED); - impl->read_state = GRPC_FD_WAITING; - - if (force_event) { - event_active(rdata->ev, EV_READ, 1); - } else if (event_add(rdata->ev, delayp) == -1) { - gpr_mu_unlock(&impl->mu); - return 0; - } - gpr_mu_unlock(&impl->mu); - return 1; -} - -int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb, - void *write_cb_arg, gpr_timespec deadline) { - int force_event = 0; - grpc_libevent_activation_data *wdata; - gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); - struct timeval delay = gpr_timeval_from_timespec(delay_timespec); - struct timeval *delayp = - gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; - - wdata = &impl->task.activation[GRPC_EM_TA_WRITE]; - - gpr_mu_lock(&impl->mu); - wdata->cb = write_cb; - wdata->arg = write_cb_arg; - - force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED); - impl->write_state = GRPC_FD_WAITING; - - if (force_event) { - event_active(wdata->ev, EV_WRITE, 1); - } else if (event_add(wdata->ev, delayp) == -1) { - gpr_mu_unlock(&impl->mu); - return 0; - } - gpr_mu_unlock(&impl->mu); - return 1; -} - -void grpc_fd_shutdown(grpc_fd *em_fd) { - event_active(em_fd->shutdown_ev, EV_READ, 1); -} - -/* Sometimes we want a followup callback: something to be added from the - current callback for the EM to invoke once this callback is complete. - This is implemented by inserting an entry into an EM queue. */ - -/* The following structure holds the field needed for adding the - followup callback. These are the argument for the followup callback, - the function to use for the followup callback, and the - activation data pointer used for the queues (to free in the CB) */ -struct followup_callback_arg { - grpc_iomgr_cb_func func; - void *cb_arg; - grpc_libevent_activation_data adata; -}; - -static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) { - struct followup_callback_arg *fcb_arg = cb_arg; - /* Invoke the function */ - fcb_arg->func(fcb_arg->cb_arg, status); - gpr_free(fcb_arg); -} - -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { - grpc_libevent_activation_data *adptr; - struct followup_callback_arg *fcb_arg; - - fcb_arg = gpr_malloc(sizeof(*fcb_arg)); - /* Set up the activation data and followup callback argument structures */ - adptr = &fcb_arg->adata; - adptr->ev = NULL; - adptr->cb = followup_proxy_callback; - adptr->arg = fcb_arg; - adptr->status = GRPC_CALLBACK_SUCCESS; - adptr->prev = NULL; - adptr->next = NULL; - - fcb_arg->func = cb; - fcb_arg->cb_arg = cb_arg; - - /* Insert an activation data for the specified em */ - add_task(adptr); -} diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h deleted file mode 100644 index 5c088006a0..0000000000 --- a/src/core/iomgr/iomgr_libevent.h +++ /dev/null @@ -1,206 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ -#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ - -#include "src/core/iomgr/iomgr.h" -#include <grpc/support/sync.h> -#include <grpc/support/time.h> - -typedef struct grpc_fd grpc_fd; - -/* gRPC event manager task "base class". This is pretend-inheritance in C89. - This should be the first member of any actual grpc_em task type. - - Memory warning: expanding this will increase memory usage in any derived - class, so be careful. - - For generality, this base can be on multiple task queues and can have - multiple event callbacks registered. Not all "derived classes" will use - this feature. */ - -typedef enum grpc_libevent_task_type { - GRPC_EM_TASK_ALARM, - GRPC_EM_TASK_FD, - GRPC_EM_TASK_DO_NOT_USE -} grpc_libevent_task_type; - -/* Different activity types to shape the callback and queueing arrays */ -typedef enum grpc_em_task_activity_type { - GRPC_EM_TA_READ, /* use this also for single-type events */ - GRPC_EM_TA_WRITE, - GRPC_EM_TA_COUNT -} grpc_em_task_activity_type; - -/* Include the following #define for convenience for tasks like alarms that - only have a single type */ -#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ - -typedef struct grpc_libevent_activation_data { - struct event *ev; /* event activated on this callback type */ - grpc_iomgr_cb_func cb; /* function pointer for callback */ - void *arg; /* argument passed to cb */ - - /* Hold the status associated with the callback when queued */ - grpc_iomgr_cb_status status; - /* Now set up to link activations into scheduler queues */ - struct grpc_libevent_activation_data *prev; - struct grpc_libevent_activation_data *next; -} grpc_libevent_activation_data; - -typedef struct grpc_libevent_task { - grpc_libevent_task_type type; - - /* Now have an array of activation data elements: one for each activity - type that could get activated */ - grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT]; -} grpc_libevent_task; - -/* Initialize *em_fd. - Requires fd is a non-blocking file descriptor. - - This takes ownership of closing fd. - - Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */ -grpc_fd *grpc_fd_create(int fd); - -/* Cause *em_fd no longer to be initialized and closes the underlying fd. - on_done is called when the underlying file descriptor is definitely close()d. - If on_done is NULL, no callback will be made. - Requires: *em_fd initialized; no outstanding notify_on_read or - notify_on_write. */ -void grpc_fd_destroy(grpc_fd *em_fd, grpc_iomgr_cb_func on_done, - void *user_data); - -/* Returns the file descriptor associated with *em_fd. */ -int grpc_fd_get(grpc_fd *em_fd); - -/* Register read interest, causing read_cb to be called once when em_fd becomes - readable, on deadline specified by deadline, or on shutdown triggered by - grpc_fd_shutdown. - read_cb will be called with read_cb_arg when *em_fd becomes readable. - read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, - GRPC_CALLBACK_TIMED_OUT if the call timed out, - and CANCELLED if the call was cancelled. - - Requires:This method must not be called before the read_cb for any previous - call runs. Edge triggered events are used whenever they are supported by the - underlying platform. This means that users must drain em_fd in read_cb before - calling notify_on_read again. Users are also expected to handle spurious - events, i.e read_cb is called while nothing can be readable from em_fd */ -int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb, - void *read_cb_arg, gpr_timespec deadline); - -/* Exactly the same semantics as above, except based on writable events. */ -int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, - void *write_cb_arg, gpr_timespec deadline); - -/* Cause any current and all future read/write callbacks to error out with - GRPC_CALLBACK_CANCELLED. */ -void grpc_fd_shutdown(grpc_fd *em_fd); - -/* =================== Event caching =================== - In order to not miss or double-return edges in the context of edge triggering - and multithreading, we need a per-fd caching layer in the eventmanager itself - to cache relevant events. - - There are two types of events we care about: calls to notify_on_[read|write] - and readable/writable events for the socket from eventfd. There are separate - event caches for read and write. - - There are three states: - 0. "waiting" -- There's been a call to notify_on_[read|write] which has not - had a corresponding event. In other words, we're waiting for an event so we - can run the callback. - 1. "idle" -- We are neither waiting nor have a cached event. - 2. "cached" -- There has been a read/write event without a waiting callback, - so we want to run the event next time the application calls - notify_on_[read|write]. - - The high level state diagram: - - +--------------------------------------------------------------------+ - | WAITING | IDLE | CACHED | - | | | | - | 1. --*-> 2. --+-> 3. --+\ - | | | <--+/ - | | | | - x+-- 6. 5. <-+-- 4. <-*-- | - | | | | - +--------------------------------------------------------------------+ - - Transitions right occur on read|write events. Transitions left occur on - notify_on_[read|write] events. - State transitions: - 1. Read|Write event while waiting -> run the callback and transition to idle. - 2. Read|Write event while idle -> transition to cached. - 3. Read|Write event with one already cached -> still cached. - 4. notify_on_[read|write] with event cached: run callback and transition to - idle. - 5. notify_on_[read|write] when idle: Store callback and transition to - waiting. - 6. notify_on_[read|write] when waiting: invalid. */ - -typedef enum grpc_fd_state { - GRPC_FD_WAITING = 0, - GRPC_FD_IDLE = 1, - GRPC_FD_CACHED = 2 -} grpc_fd_state; - -/* gRPC file descriptor handle. - The handle is used to register read/write callbacks to a file descriptor */ -struct grpc_fd { - grpc_libevent_task task; /* Base class, callbacks, queues, etc */ - int fd; /* File descriptor */ - - /* Note that the shutdown event is only needed as a workaround for libevent - not properly handling event_active on an in flight event. */ - struct event *shutdown_ev; /* activated to trigger shutdown */ - - /* protect shutdown_started|read_state|write_state and ensure barriers - between notify_on_[read|write] and read|write callbacks */ - gpr_mu mu; - int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ - grpc_fd_state read_state; - grpc_fd_state write_state; - - /* descriptor delete list. These are destroyed during polling. */ - struct grpc_fd *next; - grpc_iomgr_cb_func on_done; - void *on_done_user_data; -}; - -void grpc_iomgr_ref_address_resolution(int delta); - -#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */ diff --git a/src/core/iomgr/pollset.c b/src/core/iomgr/iomgr_posix.c index 62a0019eb3..ff9195ec1d 100644 --- a/src/core/iomgr/pollset.c +++ b/src/core/iomgr/iomgr_posix.c @@ -31,7 +31,8 @@ * */ -#include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/iomgr_posix.h" -void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; } -void grpc_pollset_destroy(grpc_pollset *pollset) {} +void grpc_iomgr_platform_init() { grpc_pollset_global_init(); } + +void grpc_iomgr_platform_shutdown() { grpc_pollset_global_shutdown(); } diff --git a/src/core/iomgr/iomgr_completion_queue_interface.h b/src/core/iomgr/iomgr_posix.h index 3c4efe773a..ca5af3e527 100644 --- a/src/core/iomgr/iomgr_completion_queue_interface.h +++ b/src/core/iomgr/iomgr_posix.h @@ -31,15 +31,12 @@ * */ -#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ -#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ -/* Internals of iomgr that are exposed only to be used for completion queue - implementation */ +#include "src/core/iomgr/iomgr_internal.h" -extern gpr_mu grpc_iomgr_mu; -extern gpr_cv grpc_iomgr_cv; +void grpc_pollset_global_init(); +void grpc_pollset_global_shutdown(); -int grpc_iomgr_work(gpr_timespec deadline); - -#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */ +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ */ diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index ba1a9d5429..7374a4ec13 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -34,18 +34,31 @@ #ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_ #define __GRPC_INTERNAL_IOMGR_POLLSET_H_ +#include <grpc/support/port_platform.h> + /* A grpc_pollset is a set of file descriptors that a higher level item is interested in. For example: - a server will typically keep a pollset containing all connected channels, so that it can find new calls to service - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -/* Eventually different implementations of iomgr will provide their own - grpc_pollset structs. As this is just a dummy wrapper to get the API in, - we just define a simple type here. */ -typedef struct { char unused; } grpc_pollset; + +#ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/pollset_posix.h" +#endif void grpc_pollset_init(grpc_pollset *pollset); void grpc_pollset_destroy(grpc_pollset *pollset); +/* Do some work on a pollset. + May involve invoking asynchronous callbacks, or actually polling file + descriptors. + Requires GRPC_POLLSET_MU(pollset) locked. + May unlock GRPC_POLLSET_MU(pollset) during its execution. */ +int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline); + +/* Break a pollset out of polling work + Requires GRPC_POLLSET_MU(pollset) locked. */ +void grpc_pollset_kick(grpc_pollset *pollset); + #endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */ diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c new file mode 100644 index 0000000000..06c7a5a0dd --- /dev/null +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -0,0 +1,237 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL + +#include "src/core/iomgr/pollset_posix.h" + +#include <errno.h> +#include <poll.h> +#include <stdlib.h> +#include <string.h> + +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +typedef struct { + /* all polled fds */ + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; + /* fds being polled by the current poller: parallel arrays of pollfd and the + * grpc_fd* that the pollfd was constructed from */ + size_t pfd_count; + size_t pfd_capacity; + grpc_fd **selfds; + struct pollfd *pfds; + /* fds that have been removed from the pollset explicitly */ + size_t del_count; + size_t del_capacity; + grpc_fd **dels; +} pollset_hdr; + +static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, + grpc_fd *fd) { + size_t i; + pollset_hdr *h = pollset->data.ptr; + /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ + for (i = 0; i < h->fd_count; i++) { + if (h->fds[i] == fd) return; + } + if (h->fd_count == h->fd_capacity) { + h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); + h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity); + } + h->fds[h->fd_count++] = fd; + grpc_fd_ref(fd); +} + +static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, + grpc_fd *fd) { + /* will get removed next poll cycle */ + pollset_hdr *h = pollset->data.ptr; + if (h->del_count == h->del_capacity) { + h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2); + h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity); + } + h->dels[h->del_count++] = fd; + grpc_fd_ref(fd); +} + +static void end_polling(grpc_pollset *pollset) { + size_t i; + pollset_hdr *h; + h = pollset->data.ptr; + for (i = 1; i < h->pfd_count; i++) { + grpc_fd_end_poll(h->selfds[i], pollset); + } +} + +static int multipoll_with_poll_pollset_maybe_work( + grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback) { + int timeout; + int r; + size_t i, np, nf, nd; + pollset_hdr *h; + + if (pollset->counter) { + return 0; + } + h = pollset->data.ptr; + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + timeout = -1; + } else { + timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); + if (timeout <= 0) { + return 1; + } + } + if (h->pfd_capacity < h->fd_count + 1) { + h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); + gpr_free(h->pfds); + gpr_free(h->selfds); + h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity); + h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity); + } + nf = 0; + np = 1; + h->pfds[0].fd = grpc_kick_read_fd(pollset); + h->pfds[0].events = POLLIN; + h->pfds[0].revents = POLLOUT; + for (i = 0; i < h->fd_count; i++) { + int remove = grpc_fd_is_orphaned(h->fds[i]); + for (nd = 0; nd < h->del_count; nd++) { + if (h->fds[i] == h->dels[nd]) remove = 1; + } + if (remove) { + grpc_fd_unref(h->fds[i]); + } else { + h->fds[nf++] = h->fds[i]; + h->pfds[np].events = + grpc_fd_begin_poll(h->fds[i], pollset, POLLIN, POLLOUT); + h->selfds[np] = h->fds[i]; + h->pfds[np].fd = h->fds[i]->fd; + h->pfds[np].revents = 0; + np++; + } + } + h->pfd_count = np; + h->fd_count = nf; + for (nd = 0; nd < h->del_count; nd++) { + grpc_fd_unref(h->dels[nd]); + } + h->del_count = 0; + if (h->pfd_count == 0) { + end_polling(pollset); + return 0; + } + pollset->counter = 1; + gpr_mu_unlock(&pollset->mu); + + r = poll(h->pfds, h->pfd_count, timeout); + if (r < 0) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } else if (r == 0) { + /* do nothing */ + } else { + if (h->pfds[0].revents & POLLIN) { + grpc_kick_drain(pollset); + } + for (i = 1; i < np; i++) { + if (h->pfds[i].revents & POLLIN) { + grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback); + } + if (h->pfds[i].revents & POLLOUT) { + grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback); + } + } + } + end_polling(pollset); + + gpr_mu_lock(&pollset->mu); + pollset->counter = 0; + gpr_cv_broadcast(&pollset->cv); + return 1; +} + +static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { + size_t i; + pollset_hdr *h = pollset->data.ptr; + GPR_ASSERT(pollset->counter == 0); + for (i = 0; i < h->fd_count; i++) { + grpc_fd_unref(h->fds[i]); + } + for (i = 0; i < h->del_count; i++) { + grpc_fd_unref(h->dels[i]); + } + gpr_free(h->pfds); + gpr_free(h->selfds); + gpr_free(h->fds); + gpr_free(h->dels); + gpr_free(h); +} + +static const grpc_pollset_vtable multipoll_with_poll_pollset = { + multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, + multipoll_with_poll_pollset_maybe_work, + multipoll_with_poll_pollset_destroy}; + +void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, + size_t nfds) { + size_t i; + pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); + pollset->vtable = &multipoll_with_poll_pollset; + pollset->data.ptr = h; + h->fd_count = nfds; + h->fd_capacity = nfds; + h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); + h->pfd_count = 0; + h->pfd_capacity = 0; + h->pfds = NULL; + h->selfds = NULL; + h->del_count = 0; + h->del_capacity = 0; + h->dels = NULL; + for (i = 0; i < nfds; i++) { + h->fds[i] = fds[i]; + grpc_fd_ref(fds[i]); + } +} + +#endif diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c new file mode 100644 index 0000000000..ba4031e11f --- /dev/null +++ b/src/core/iomgr/pollset_posix.c @@ -0,0 +1,340 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/pollset_posix.h" + +#include <errno.h> +#include <poll.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> + +/* kick pipes: we keep a sharded set of pipes to allow breaking from poll. + Ideally this would be 1:1 with pollsets, but we'd like to avoid associating + full kernel objects with each pollset to keep them lightweight, so instead + keep a sharded set and allow associating a pollset with one of the shards. + + TODO(ctiller): move this out from this file, and allow an eventfd + implementation on linux */ + +#define LOG2_KICK_SHARDS 6 +#define KICK_SHARDS (1 << LOG2_KICK_SHARDS) + +static int g_kick_pipes[KICK_SHARDS][2]; +static grpc_pollset g_backup_pollset; +static int g_shutdown_backup_poller; +static gpr_event g_backup_poller_done; + +static void backup_poller(void *p) { + gpr_timespec delta = gpr_time_from_millis(100); + gpr_timespec last_poll = gpr_now(); + + gpr_mu_lock(&g_backup_pollset.mu); + while (g_shutdown_backup_poller == 0) { + gpr_timespec next_poll = gpr_time_add(last_poll, delta); + grpc_pollset_work(&g_backup_pollset, next_poll); + gpr_mu_unlock(&g_backup_pollset.mu); + gpr_sleep_until(next_poll); + gpr_mu_lock(&g_backup_pollset.mu); + last_poll = next_poll; + } + gpr_mu_unlock(&g_backup_pollset.mu); + + gpr_event_set(&g_backup_poller_done, (void *)1); +} + +static size_t kick_shard(const grpc_pollset *info) { + size_t x = (size_t)info; + return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1); +} + +int grpc_kick_read_fd(grpc_pollset *p) { + return g_kick_pipes[kick_shard(p)][0]; +} + +static int grpc_kick_write_fd(grpc_pollset *p) { + return g_kick_pipes[kick_shard(p)][1]; +} + +void grpc_pollset_force_kick(grpc_pollset *p) { + char c = 0; + while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR) + ; +} + +void grpc_pollset_kick(grpc_pollset *p) { + if (!p->counter) return; + grpc_pollset_force_kick(p); +} + +void grpc_kick_drain(grpc_pollset *p) { + int fd = grpc_kick_read_fd(p); + char buf[128]; + int r; + + for (;;) { + r = read(fd, buf, sizeof(buf)); + if (r > 0) continue; + if (r == 0) return; + switch (errno) { + case EAGAIN: + return; + case EINTR: + continue; + default: + gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); + return; + } + } +} + +/* global state management */ + +grpc_pollset *grpc_backup_pollset() { return &g_backup_pollset; } + +void grpc_pollset_global_init() { + int i; + gpr_thd_id id; + + /* initialize the kick shards */ + for (i = 0; i < KICK_SHARDS; i++) { + GPR_ASSERT(0 == pipe(g_kick_pipes[i])); + GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1)); + GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1)); + } + + /* initialize the backup pollset */ + grpc_pollset_init(&g_backup_pollset); + + /* start the backup poller thread */ + g_shutdown_backup_poller = 0; + gpr_event_init(&g_backup_poller_done); + gpr_thd_new(&id, backup_poller, NULL, NULL); +} + +void grpc_pollset_global_shutdown() { + int i; + + /* terminate the backup poller thread */ + gpr_mu_lock(&g_backup_pollset.mu); + g_shutdown_backup_poller = 1; + gpr_mu_unlock(&g_backup_pollset.mu); + gpr_event_wait(&g_backup_poller_done, gpr_inf_future); + + /* destroy the backup pollset */ + grpc_pollset_destroy(&g_backup_pollset); + + /* destroy the kick shards */ + for (i = 0; i < KICK_SHARDS; i++) { + close(g_kick_pipes[i][0]); + close(g_kick_pipes[i][1]); + } +} + +/* main interface */ + +static void become_empty_pollset(grpc_pollset *pollset); +static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd); + +void grpc_pollset_init(grpc_pollset *pollset) { + gpr_mu_init(&pollset->mu); + gpr_cv_init(&pollset->cv); + become_empty_pollset(pollset); +} + +void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + gpr_mu_lock(&pollset->mu); + pollset->vtable->add_fd(pollset, fd); + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); +} + +void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { + gpr_mu_lock(&pollset->mu); + pollset->vtable->del_fd(pollset, fd); + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); +} + +int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { + /* pollset->mu already held */ + gpr_timespec now; + now = gpr_now(); + if (gpr_time_cmp(now, deadline) > 0) { + return 0; + } + if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { + return 1; + } + if (grpc_alarm_check(&pollset->mu, now, &deadline)) { + return 1; + } + return pollset->vtable->maybe_work(pollset, deadline, now, 1); +} + +void grpc_pollset_destroy(grpc_pollset *pollset) { + pollset->vtable->destroy(pollset); + gpr_mu_destroy(&pollset->mu); + gpr_cv_destroy(&pollset->cv); +} + +/* + * empty_pollset - a vtable that provides polling for NO file descriptors + */ + +static void empty_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + become_unary_pollset(pollset, fd); +} + +static void empty_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {} + +static int empty_pollset_maybe_work(grpc_pollset *pollset, + gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback) { + return 0; +} + +static void empty_pollset_destroy(grpc_pollset *pollset) {} + +static const grpc_pollset_vtable empty_pollset = { + empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work, + empty_pollset_destroy}; + +static void become_empty_pollset(grpc_pollset *pollset) { + pollset->vtable = &empty_pollset; +} + +/* + * unary_poll_pollset - a vtable that provides polling for one file descriptor + * via poll() + */ + +static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + grpc_fd *fds[2]; + if (fd == pollset->data.ptr) return; + fds[0] = pollset->data.ptr; + fds[1] = fd; + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_fd_unref(fds[0]); +} + +static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { + if (fd == pollset->data.ptr) { + grpc_fd_unref(pollset->data.ptr); + become_empty_pollset(pollset); + } +} + +static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, + gpr_timespec deadline, + gpr_timespec now, + int allow_synchronous_callback) { + struct pollfd pfd[2]; + grpc_fd *fd; + int timeout; + int r; + + if (pollset->counter) { + return 0; + } + fd = pollset->data.ptr; + if (grpc_fd_is_orphaned(fd)) { + grpc_fd_unref(fd); + become_empty_pollset(pollset); + return 0; + } + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + timeout = -1; + } else { + timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); + if (timeout <= 0) { + return 1; + } + } + pfd[0].fd = grpc_kick_read_fd(pollset); + pfd[0].events = POLLIN; + pfd[0].revents = 0; + pfd[1].fd = fd->fd; + pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT); + pfd[1].revents = 0; + pollset->counter = 1; + gpr_mu_unlock(&pollset->mu); + + r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); + if (r < 0) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } else if (r == 0) { + /* do nothing */ + } else { + if (pfd[0].revents & POLLIN) { + grpc_kick_drain(pollset); + } + if (pfd[1].revents & POLLIN) { + grpc_fd_become_readable(fd, allow_synchronous_callback); + } + if (pfd[1].revents & POLLOUT) { + grpc_fd_become_writable(fd, allow_synchronous_callback); + } + } + + gpr_mu_lock(&pollset->mu); + grpc_fd_end_poll(fd, pollset); + pollset->counter = 0; + gpr_cv_broadcast(&pollset->cv); + return 1; +} + +static void unary_poll_pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(pollset->counter == 0); + grpc_fd_unref(pollset->data.ptr); +} + +static const grpc_pollset_vtable unary_poll_pollset = { + unary_poll_pollset_add_fd, unary_poll_pollset_del_fd, + unary_poll_pollset_maybe_work, unary_poll_pollset_destroy}; + +static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) { + pollset->vtable = &unary_poll_pollset; + pollset->counter = 0; + pollset->data.ptr = fd; + grpc_fd_ref(fd); +} diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h new file mode 100644 index 0000000000..f051079f5b --- /dev/null +++ b/src/core/iomgr/pollset_posix.h @@ -0,0 +1,95 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ + +#include <grpc/support/sync.h> + +typedef struct grpc_pollset_vtable grpc_pollset_vtable; + +/* forward declare only in this file to avoid leaking impl details via + pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not + use the struct tag */ +struct grpc_fd; + +typedef struct grpc_pollset { + /* pollsets under posix can mutate representation as fds are added and + removed. + For example, we may choose a poll() based implementation on linux for + few fds, and an epoll() based implementation for many fds */ + const grpc_pollset_vtable *vtable; + gpr_mu mu; + gpr_cv cv; + int counter; + union { + int fd; + void *ptr; + } data; +} grpc_pollset; + +struct grpc_pollset_vtable { + void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, + gpr_timespec now, int allow_synchronous_callback); + void (*destroy)(grpc_pollset *pollset); +}; + +#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) +#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv) + +/* Add an fd to a pollset */ +void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd); +/* Force remove an fd from a pollset (normally they are removed on the next + poll after an fd is orphaned) */ +void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd); + +/* Force any current pollers to break polling */ +void grpc_pollset_force_kick(grpc_pollset *pollset); +/* Returns the fd to listen on for kicks */ +int grpc_kick_read_fd(grpc_pollset *p); +/* Call after polling has been kicked to leave the kicked state */ +void grpc_kick_drain(grpc_pollset *p); + +/* All fds get added to a backup pollset to ensure that progress is made + regardless of applications listening to events. Relying on this is slow + however (the backup pollset only listens every 100ms or so) - so it's not + to be relied on. */ +grpc_pollset *grpc_backup_pollset(); + +/* turn a pollset into a multipoller: platform specific */ +void grpc_platform_become_multipoller(grpc_pollset *pollset, + struct grpc_fd **fds, size_t fd_count); + +#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index a0a04297eb..c9c2c5378a 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -41,7 +41,7 @@ #include <unistd.h> #include <string.h> -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include <grpc/support/alloc.h> @@ -201,7 +201,7 @@ static void do_request(void *rp) { gpr_free(r->default_port); gpr_free(r); cb(arg, resolved); - grpc_iomgr_ref_address_resolution(-1); + grpc_iomgr_unref(); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -213,7 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); gpr_thd_id id; - grpc_iomgr_ref_address_resolution(1); + grpc_iomgr_ref(); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 88b599b582..d675c2dcec 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -38,7 +38,9 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/iomgr_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -49,8 +51,11 @@ typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; + gpr_mu mu; grpc_fd *fd; gpr_timespec deadline; + grpc_alarm alarm; + int refs; } async_connect; static int prepare_socket(int fd) { @@ -74,21 +79,42 @@ error: return 0; } -static void on_writable(void *acp, grpc_iomgr_cb_status status) { +static void on_alarm(void *acp, int success) { + int done; + async_connect *ac = acp; + gpr_mu_lock(&ac->mu); + if (ac->fd != NULL && success) { + grpc_fd_shutdown(ac->fd); + } + done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } +} + +static void on_writable(void *acp, int success) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; int err; - int fd = grpc_fd_get(ac->fd); + int fd = ac->fd->fd; + int done; + grpc_endpoint *ep = NULL; + void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; + void *cb_arg = ac->cb_arg; + + grpc_alarm_cancel(&ac->alarm); - if (status == GRPC_CALLBACK_SUCCESS) { + if (success) { do { so_error_size = sizeof(so_error); err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); - goto error; + goto finish; } else if (so_error != 0) { if (so_error == ENOBUFS) { /* We will get one of these errors if we have run out of @@ -106,7 +132,7 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) { opened too many network connections. The "easy" fix: don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); - grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline); + grpc_fd_notify_on_write(ac->fd, on_writable, ac); return; } else { switch (so_error) { @@ -117,27 +143,31 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) { gpr_log(GPR_ERROR, "socket error: %d", so_error); break; } - goto error; + goto finish; } } else { - goto great_success; + ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + goto finish; } } else { - gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status); - goto error; + gpr_log(GPR_ERROR, "on_writable failed during connect"); + goto finish; } abort(); -error: - ac->cb(ac->cb_arg, NULL); - grpc_fd_destroy(ac->fd, NULL, NULL); - gpr_free(ac); - return; - -great_success: - ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); - gpr_free(ac); +finish: + gpr_mu_lock(&ac->mu); + if (!ep) { + grpc_fd_orphan(ac->fd, NULL, NULL); + } + done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } + cb(cb_arg, ep); } void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), @@ -176,6 +206,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), } while (err < 0 && errno == EINTR); if (err >= 0) { + gpr_log(GPR_DEBUG, "instant connect"); cb(arg, grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); return; @@ -191,7 +222,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac = gpr_malloc(sizeof(async_connect)); ac->cb = cb; ac->cb_arg = arg; - ac->deadline = deadline; ac->fd = grpc_fd_create(fd); - grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline); + gpr_mu_init(&ac->mu); + ac->refs = 2; + + grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); + grpc_fd_notify_on_write(ac->fd, on_writable, ac); } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index bc3ce69e47..657f34aaf9 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -255,18 +255,14 @@ typedef struct { grpc_endpoint_read_cb read_cb; void *read_user_data; - gpr_timespec read_deadline; grpc_endpoint_write_cb write_cb; void *write_user_data; - gpr_timespec write_deadline; grpc_tcp_slice_state write_state; } grpc_tcp; -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status); -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status); +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success); static void grpc_tcp_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -276,7 +272,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) { static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { - grpc_fd_destroy(tcp->em_fd, NULL, NULL); + grpc_fd_orphan(tcp->em_fd, NULL, NULL); gpr_free(tcp); } } @@ -308,8 +304,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, #define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status) { +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; int iov_size = 1; gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; @@ -324,18 +319,12 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, 0); - if (status == GRPC_CALLBACK_CANCELLED) { + if (!success) { call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); grpc_tcp_unref(tcp); return; } - if (status == GRPC_CALLBACK_TIMED_OUT) { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT); - grpc_tcp_unref(tcp); - return; - } - /* TODO(klempner): Limit the amount we read at once. */ for (;;) { allocated_bytes = slice_state_append_blocks_into_iovec( @@ -377,8 +366,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, } else { /* Spurious read event, consume it here */ slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, - tcp->read_deadline); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); } } else { /* TODO(klempner): Log interesting errors */ @@ -407,14 +395,13 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, } static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline) { + void *user_data) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_user_data = user_data; - tcp->read_deadline = deadline; gpr_ref(&tcp->refcount); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); } #define MAX_WRITE_IOVEC 16 @@ -460,34 +447,24 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { }; } -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status) { +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; grpc_endpoint_write_status write_status; grpc_endpoint_cb_status cb_status; grpc_endpoint_write_cb cb; - cb_status = GRPC_ENDPOINT_CB_OK; - - if (status == GRPC_CALLBACK_CANCELLED) { - cb_status = GRPC_ENDPOINT_CB_SHUTDOWN; - } else if (status == GRPC_CALLBACK_TIMED_OUT) { - cb_status = GRPC_ENDPOINT_CB_TIMED_OUT; - } - - if (cb_status != GRPC_ENDPOINT_CB_OK) { + if (!success) { slice_state_destroy(&tcp->write_state); cb = tcp->write_cb; tcp->write_cb = NULL; - cb(tcp->write_user_data, cb_status); + cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN); grpc_tcp_unref(tcp); return; } write_status = grpc_tcp_flush(tcp); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, - tcp->write_deadline); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); } else { slice_state_destroy(&tcp->write_state); if (write_status == GRPC_ENDPOINT_WRITE_DONE) { @@ -502,9 +479,11 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, } } -static grpc_endpoint_write_status grpc_tcp_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { +static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_endpoint_write_status status; @@ -530,17 +509,15 @@ static grpc_endpoint_write_status grpc_tcp_write( gpr_ref(&tcp->refcount); tcp->write_cb = cb; tcp->write_user_data = user_data; - tcp->write_deadline = deadline; - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, - tcp->write_deadline); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); } return status; } static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { - /* tickle the pollset so we crash if things aren't wired correctly */ - pollset->unused++; + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_add_fd(pollset, tcp->em_fd); } static const grpc_endpoint_vtable vtable = { @@ -550,14 +527,12 @@ static const grpc_endpoint_vtable vtable = { grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; - tcp->fd = grpc_fd_get(em_fd); + tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; tcp->read_user_data = NULL; tcp->write_user_data = NULL; tcp->slice_size = slice_size; - tcp->read_deadline = gpr_inf_future; - tcp->write_deadline = gpr_inf_future; slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index 830394d534..c3eef1b4b7 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -45,7 +45,7 @@ */ #include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/fd_posix.h" #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 46fba13f90..1968246b75 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -49,8 +49,8 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); grpc_tcp_server *grpc_tcp_server_create(); /* Start listening to bound ports */ -void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb, - void *cb_arg); +void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset, + grpc_tcp_server_cb cb, void *cb_arg); /* Add a port to the server, returning true on success, or false otherwise. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 2abaf15ce4..5ed517748a 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -45,7 +45,7 @@ #include <string.h> #include <errno.h> -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -97,13 +97,8 @@ grpc_tcp_server *grpc_tcp_server_create() { return s; } -static void done_destroy(void *p, grpc_iomgr_cb_status status) { - gpr_event_set(p, (void *)1); -} - void grpc_tcp_server_destroy(grpc_tcp_server *s) { size_t i; - gpr_event fd_done; gpr_mu_lock(&s->mu); /* shutdown all fd's */ for (i = 0; i < s->nports; i++) { @@ -118,9 +113,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) { /* delete ALL the things */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; - gpr_event_init(&fd_done); - grpc_fd_destroy(sp->emfd, done_destroy, &fd_done); - gpr_event_wait(&fd_done, gpr_inf_future); + grpc_fd_orphan(sp->emfd, NULL, NULL); } gpr_free(s->ports); gpr_free(s); @@ -196,10 +189,10 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, grpc_iomgr_cb_status status) { +static void on_read(void *arg, int success) { server_port *sp = arg; - if (status != GRPC_CALLBACK_SUCCESS) { + if (!success) { goto error; } @@ -215,7 +208,7 @@ static void on_read(void *arg, grpc_iomgr_cb_status status) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future); + grpc_fd_notify_on_read(sp->emfd, on_read, sp); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -254,15 +247,10 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity); } sp = &s->ports[s->nports++]; - sp->emfd = grpc_fd_create(fd); - sp->fd = fd; sp->server = s; - /* initialize the em desc */ - if (sp->emfd == NULL) { - s->nports--; - gpr_mu_unlock(&s->mu); - return 0; - } + sp->fd = fd; + sp->emfd = grpc_fd_create(fd); + GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); return 1; @@ -319,8 +307,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) { return (0 <= index && index < s->nports) ? s->ports[index].fd : -1; } -void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, - void *cb_arg) { +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, + grpc_tcp_server_cb cb, void *cb_arg) { size_t i; GPR_ASSERT(cb); gpr_mu_lock(&s->mu); @@ -329,8 +317,10 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { - grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i], - gpr_inf_future); + if (pollset) { + grpc_pollset_add_fd(pollset, s->ports[i].emfd); + } + grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 442d2fa624..c99ac8021d 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -555,12 +555,11 @@ static int fake_oauth2_has_request_metadata_only( return 1; } -void on_simulated_token_fetch_done(void *user_data, - grpc_iomgr_cb_status status) { +void on_simulated_token_fetch_done(void *user_data, int success) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)r->creds; - GPR_ASSERT(status == GRPC_CALLBACK_SUCCESS); + GPR_ASSERT(success); r->cb(r->user_data, &c->access_token_md, 1, GRPC_CREDENTIALS_OK); grpc_credentials_metadata_request_destroy(r); } diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index cab09ca49d..7f0fdf73c9 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -184,8 +184,7 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, } static void endpoint_notify_on_read(grpc_endpoint *secure_ep, - grpc_endpoint_read_cb cb, void *user_data, - gpr_timespec deadline) { + grpc_endpoint_read_cb cb, void *user_data) { secure_endpoint *ep = (secure_endpoint *)secure_ep; ep->read_cb = cb; ep->read_user_data = user_data; @@ -200,7 +199,7 @@ static void endpoint_notify_on_read(grpc_endpoint *secure_ep, return; } - grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep, deadline); + grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep); } static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, @@ -217,9 +216,11 @@ static void on_write(void *data, grpc_endpoint_cb_status error) { secure_endpoint_unref(ep); } -static grpc_endpoint_write_status endpoint_write( - grpc_endpoint *secure_ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { +static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data) { int i = 0; int output_buffer_count = 0; tsi_result result = TSI_OK; @@ -308,7 +309,7 @@ static grpc_endpoint_write_status endpoint_write( /* Need to keep the endpoint alive across a transport */ secure_endpoint_ref(ep); status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices, - output_buffer_count, on_write, ep, deadline); + output_buffer_count, on_write, ep); if (status != GRPC_ENDPOINT_WRITE_PENDING) { secure_endpoint_unref(ep); } diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index eb11251912..3df91ed8e7 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -105,6 +105,7 @@ static void check_peer(grpc_secure_transport_setup *s) { grpc_security_status peer_status; tsi_peer peer; tsi_result result = tsi_handshaker_extract_peer(s->handshaker, &peer); + if (result != TSI_OK) { gpr_log(GPR_ERROR, "Peer extraction failed with error %s", tsi_result_to_string(result)); @@ -152,9 +153,8 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - write_status = - grpc_endpoint_write(s->endpoint, &to_send, 1, - on_handshake_data_sent_to_peer, s, gpr_inf_future); + write_status = grpc_endpoint_write(s->endpoint, &to_send, 1, + on_handshake_data_sent_to_peer, s); if (write_status == GRPC_ENDPOINT_WRITE_ERROR) { gpr_log(GPR_ERROR, "Could not send handshake data to peer."); secure_transport_setup_done(s, 0); @@ -200,8 +200,7 @@ static void on_handshake_data_received_from_peer( /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ grpc_endpoint_notify_on_read(s->endpoint, - on_handshake_data_received_from_peer, setup, - gpr_inf_future); + on_handshake_data_received_from_peer, setup); cleanup_slices(slices, nslices); return; } else { @@ -258,8 +257,7 @@ static void on_handshake_data_sent_to_peer(void *setup, /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ grpc_endpoint_notify_on_read(s->endpoint, - on_handshake_data_received_from_peer, setup, - gpr_inf_future); + on_handshake_data_received_from_peer, setup); } else { check_peer(s); } diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 28b56dd4c9..9d7c0e5e5a 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -77,9 +77,9 @@ static void on_accept(void *server, grpc_endpoint *tcp) { /* Note: the following code is the same with server_chttp2.c */ /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp) { +static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, on_accept, server); + grpc_tcp_server_start(tcp, pollset, on_accept, server); } /* Server callback: destroy the tcp listener (so we don't generate further diff --git a/src/core/support/time.c b/src/core/support/time.c index 712bdf441c..5330092f56 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -249,3 +249,18 @@ gpr_timespec gpr_timespec_from_timeval(struct timeval t) { ts.tv_nsec = t.tv_usec * 1000; return ts; } + +gpr_int32 gpr_time_to_millis(gpr_timespec t) { + if (t.tv_sec >= 2147483) { + if (t.tv_sec == 2147483 && t.tv_nsec < 648 * GPR_NS_PER_MS) { + return 2147483 * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS; + } + return 2147483647; + } else if (t.tv_sec <= -2147483) { + /* TODO(ctiller): correct handling here (it's so far in the past do we + care?) */ + return -2147483648; + } else { + return t.tv_sec * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS; + } +} diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 9ed617f665..9c5f5064eb 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -878,9 +878,9 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) { return &call->incoming_metadata; } -static void call_alarm(void *arg, grpc_iomgr_cb_status status) { +static void call_alarm(void *arg, int success) { grpc_call *call = arg; - if (status == GRPC_CALLBACK_SUCCESS) { + if (success) { grpc_call_cancel(call); } grpc_call_internal_unref(call); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 4837f5b978..b59c36e03a 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,7 +36,7 @@ #include <stdio.h> #include <string.h> -#include "src/core/iomgr/iomgr_completion_queue_interface.h" +#include "src/core/iomgr/pollset.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" @@ -61,6 +61,7 @@ typedef struct event { /* Completion queue structure */ struct grpc_completion_queue { + /* TODO(ctiller): see if this can be removed */ int allow_polling; /* When refs drops to zero, we are in shutdown mode, and will be destroyable @@ -100,7 +101,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { /* Create and append an event to the queue. Returns the event so that its data members can be filled in. - Requires grpc_iomgr_mu locked. */ + Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data) { @@ -126,7 +127,8 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, ev->bucket_prev = cc->buckets[bucket]->bucket_prev; ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; } - gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + grpc_pollset_kick(&cc->pollset); return ev; } @@ -149,7 +151,7 @@ static void end_op_locked(grpc_completion_queue *cc, if (gpr_unref(&cc->refs)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); } } @@ -157,11 +159,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_byte_buffer *read) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data); ev->base.data.read = read; end_op_locked(cc, GRPC_READ); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, @@ -169,11 +171,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.invoke_accepted = error; end_op_locked(cc, GRPC_INVOKE_ACCEPTED); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, @@ -181,11 +183,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.write_accepted = error; end_op_locked(cc, GRPC_WRITE_ACCEPTED); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, @@ -193,11 +195,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.finish_accepted = error; end_op_locked(cc, GRPC_FINISH_ACCEPTED); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, @@ -206,13 +208,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, void *user_data, size_t count, grpc_metadata *elements) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish, user_data); ev->base.data.client_metadata_read.count = count; ev->base.data.client_metadata_read.elements = elements; end_op_locked(cc, GRPC_CLIENT_METADATA_READ); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -221,14 +223,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_metadata *metadata_elements, size_t metadata_count) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data); ev->base.data.finished.status = status; ev->base.data.finished.details = details; ev->base.data.finished.metadata_count = metadata_count; ev->base.data.finished.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_FINISHED); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -237,7 +239,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data); ev->base.data.server_rpc_new.method = method; ev->base.data.server_rpc_new.host = host; @@ -245,7 +247,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, ev->base.data.server_rpc_new.metadata_count = metadata_count; ev->base.data.server_rpc_new.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_SERVER_RPC_NEW); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ @@ -262,7 +264,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if (cc->queue != NULL) { gpr_uintptr bucket; @@ -288,15 +290,16 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_iomgr_work(deadline)) { + if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { continue; } - if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { - gpr_mu_unlock(&grpc_iomgr_mu); + if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), + GRPC_POLLSET_MU(&cc->pollset), deadline)) { + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); return NULL; } } - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -334,7 +337,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if ((ev = pluck_event(cc, tag))) { break; @@ -343,15 +346,16 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_iomgr_work(deadline)) { + if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { continue; } - if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { - gpr_mu_unlock(&grpc_iomgr_mu); + if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), + GRPC_POLLSET_MU(&cc->pollset), deadline)) { + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); return NULL; } } - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -360,11 +364,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->refs)) { - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 3829e7aa8f..aa544a97f2 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -52,7 +52,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; - void (*start)(grpc_server *server, void *arg); + void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset); void (*destroy)(grpc_server *server, void *arg); struct listener *next; } listener; @@ -192,7 +192,7 @@ static void orphan_channel(channel_data *chand) { chand->next = chand->prev = chand; } -static void finish_destroy_channel(void *cd, grpc_iomgr_cb_status status) { +static void finish_destroy_channel(void *cd, int success) { channel_data *chand = cd; grpc_server *server = chand->server; /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ @@ -247,7 +247,7 @@ static void start_new_rpc(grpc_call_element *elem) { gpr_mu_unlock(&server->mu); } -static void kill_zombie(void *elem, grpc_iomgr_cb_status status) { +static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -336,7 +336,7 @@ static void channel_op(grpc_channel_element *elem, } } -static void finish_shutdown_channel(void *cd, grpc_iomgr_cb_status status) { +static void finish_shutdown_channel(void *cd, int success) { channel_data *chand = cd; grpc_channel_op op; op.type = GRPC_CHANNEL_DISCONNECT; @@ -468,7 +468,7 @@ void grpc_server_start(grpc_server *server) { listener *l; for (l = server->listeners; l; l = l->next) { - l->start(server, l->arg); + l->start(server, l->arg, grpc_cq_pollset(server->cq)); } } @@ -596,7 +596,8 @@ void grpc_server_destroy(grpc_server *server) { } void grpc_server_add_listener(grpc_server *server, void *arg, - void (*start)(grpc_server *server, void *arg), + void (*start)(grpc_server *server, void *arg, + grpc_pollset *pollset), void (*destroy)(grpc_server *server, void *arg)) { listener *l = gpr_malloc(sizeof(listener)); l->arg = arg; diff --git a/src/core/surface/server.h b/src/core/surface/server.h index f0773ab9d5..61292ebe4e 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -47,7 +47,8 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, /* Add a listener to the server: when the server starts, it will call start, and when it shuts down, it will call destroy */ void grpc_server_add_listener(grpc_server *server, void *listener, - void (*start)(grpc_server *server, void *arg), + void (*start)(grpc_server *server, void *arg, + grpc_pollset *pollset), void (*destroy)(grpc_server *server, void *arg)); /* Setup a transport - creates a channel stack, binds the transport to the diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index a5fdd03774..a0961bd449 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) { } /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp) { +static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, new_transport, server); + grpc_tcp_server_start(tcp, pollset, new_transport, server); } /* Server callback: destroy the tcp listener (so we don't generate further diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a8ae8cc5bc..5bf763e76f 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -711,7 +711,7 @@ static void unlock(transport *t) { /* write some bytes if necessary */ while (start_write) { switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count, - finish_write, t, gpr_inf_future)) { + finish_write, t)) { case GRPC_ENDPOINT_WRITE_DONE: /* grab the lock directly without wrappers since we just want to continue writes if we loop: no need to check read callbacks again */ @@ -1617,7 +1617,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, case GRPC_ENDPOINT_CB_SHUTDOWN: case GRPC_ENDPOINT_CB_EOF: case GRPC_ENDPOINT_CB_ERROR: - case GRPC_ENDPOINT_CB_TIMED_OUT: lock(t); drop_connection(t); t->reading = 0; @@ -1642,7 +1641,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); if (keep_reading) { - grpc_endpoint_notify_on_read(t->ep, recv_data, t, gpr_inf_future); + grpc_endpoint_notify_on_read(t->ep, recv_data, t); } } diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index 1181f1b4de..e5b7304743 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -371,7 +371,12 @@ void cq_verify_empty(cq_verifier *v) { GPR_ASSERT(v->expect.next == &v->expect && "expectation queue must be empty"); ev = grpc_completion_queue_next(v->cq, deadline); - GPR_ASSERT(ev == NULL); + if (ev != NULL) { + char *s = grpc_event_string(ev); + gpr_log(GPR_ERROR, "unexpected event (expected nothing): %s", s); + gpr_free(s); + abort(); + } } static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) { diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index b3fac796f4..cb5c6f7cad 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -120,6 +120,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, GPR_ASSERT(!f->server); f->server = grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); + grpc_server_start(f->server); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index 061e049a09..84acfa6d6c 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -120,6 +120,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, GPR_ASSERT(!f->server); f->server = grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); + grpc_server_start(f->server); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); } diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index d15fef6adc..bad86fb9dc 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -112,7 +112,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { gpr_slice request_payload_slice = large_slice(); grpc_byte_buffer *request_payload = grpc_byte_buffer_create(&request_payload_slice, 1); - gpr_timespec deadline = n_seconds_time(10); + gpr_timespec deadline = n_seconds_time(30); grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL); cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index 08198d49fb..a418d1b15f 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -156,9 +156,12 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { grpc_call *c2; grpc_call *s1; grpc_call *s2; + int live_call; + grpc_call *live_call_obj; gpr_timespec deadline; cq_verifier *v_client; cq_verifier *v_server; + grpc_event *ev; server_arg.key = GRPC_ARG_MAX_CONCURRENT_STREAMS; server_arg.type = GRPC_ARG_INTEGER; @@ -180,9 +183,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { /* start two requests - ensuring that the second is not accepted until the first completes */ deadline = five_seconds_time(); - c1 = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c1 = + grpc_channel_create_call(f.client, "/alpha", "test.google.com", deadline); GPR_ASSERT(c1); - c2 = grpc_channel_create_call(f.client, "/bar", "test.google.com", deadline); + c2 = grpc_channel_create_call(f.client, "/beta", "test.google.com", deadline); GPR_ASSERT(c1); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); @@ -191,19 +195,29 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { tag(301), tag(302), 0)); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c2, f.client_cq, tag(400), tag(401), tag(402), 0)); - cq_expect_invoke_accepted(v_client, tag(300), GRPC_OP_OK); - cq_verify(v_client); + ev = grpc_completion_queue_next( + f.client_cq, gpr_time_add(gpr_now(), gpr_time_from_seconds(10))); + GPR_ASSERT(ev); + GPR_ASSERT(ev->type == GRPC_INVOKE_ACCEPTED); + GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK); + /* The /alpha or /beta calls started above could be invoked (but NOT both); + * check this here */ + live_call = (int)(gpr_intptr)ev->tag; + live_call_obj = live_call == 300 ? c1 : c2; + grpc_event_finish(ev); - GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c1, tag(303))); - cq_expect_finish_accepted(v_client, tag(303), GRPC_OP_OK); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_writes_done(live_call_obj, tag(live_call + 3))); + cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK); cq_verify(v_client); - cq_expect_server_rpc_new(v_server, &s1, tag(100), "/foo", "test.google.com", - deadline, NULL); + cq_expect_server_rpc_new(v_server, &s1, tag(100), + live_call == 300 ? "/alpha" : "/beta", + "test.google.com", deadline, NULL); cq_verify(v_server); GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s1, f.server_cq, tag(102), 0)); - cq_expect_client_metadata_read(v_client, tag(301), NULL); + cq_expect_client_metadata_read(v_client, tag(live_call + 1), NULL); cq_verify(v_client); GPR_ASSERT(GRPC_CALL_OK == @@ -214,22 +228,26 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { cq_verify(v_server); /* first request is finished, we should be able to start the second */ - cq_expect_finished_with_status(v_client, tag(302), GRPC_STATUS_UNIMPLEMENTED, - "xyz", NULL); - cq_expect_invoke_accepted(v_client, tag(400), GRPC_OP_OK); + cq_expect_finished_with_status(v_client, tag(live_call + 2), + GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL); + live_call = (live_call == 300) ? 400 : 300; + live_call_obj = live_call == 300 ? c1 : c2; + cq_expect_invoke_accepted(v_client, tag(live_call), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c2, tag(403))); - cq_expect_finish_accepted(v_client, tag(403), GRPC_OP_OK); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_writes_done(live_call_obj, tag(live_call + 3))); + cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK); cq_verify(v_client); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(200))); - cq_expect_server_rpc_new(v_server, &s2, tag(200), "/bar", "test.google.com", - deadline, NULL); + cq_expect_server_rpc_new(v_server, &s2, tag(200), + live_call == 300 ? "/alpha" : "/beta", + "test.google.com", deadline, NULL); cq_verify(v_server); GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s2, f.server_cq, tag(202), 0)); - cq_expect_client_metadata_read(v_client, tag(401), NULL); + cq_expect_client_metadata_read(v_client, tag(live_call + 1), NULL); cq_verify(v_client); GPR_ASSERT(GRPC_CALL_OK == @@ -239,8 +257,8 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { cq_expect_finished(v_server, tag(202), NULL); cq_verify(v_server); - cq_expect_finished_with_status(v_client, tag(402), GRPC_STATUS_UNIMPLEMENTED, - "xyz", NULL); + cq_expect_finished_with_status(v_client, tag(live_call + 2), + GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL); cq_verify(v_client); cq_verifier_destroy(v_client); diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c index a8aa6126e6..686d21d705 100644 --- a/test/core/iomgr/alarm_list_test.c +++ b/test/core/iomgr/alarm_list_test.c @@ -41,13 +41,13 @@ #define MAX_CB 30 -static int cb_called[MAX_CB][GRPC_CALLBACK_DO_NOT_USE]; +static int cb_called[MAX_CB][2]; static int kicks; void grpc_kick_poller() { ++kicks; } -static void cb(void *arg, grpc_iomgr_cb_status status) { - cb_called[(gpr_intptr)arg][status]++; +static void cb(void *arg, int success) { + cb_called[(gpr_intptr)arg][success]++; } static void add_test() { @@ -72,36 +72,36 @@ static void add_test() { /* collect alarms. Only the first batch should be ready. */ GPR_ASSERT(10 == - grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(500)))); + grpc_alarm_check( + NULL, gpr_time_add(start, gpr_time_from_millis(500)), NULL)); for (i = 0; i < 20; i++) { - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10)); - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); + GPR_ASSERT(cb_called[i][1] == (i < 10)); + GPR_ASSERT(cb_called[i][0] == 0); } GPR_ASSERT(0 == - grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(600)))); + grpc_alarm_check( + NULL, gpr_time_add(start, gpr_time_from_millis(600)), NULL)); for (i = 0; i < 30; i++) { - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10)); - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); + GPR_ASSERT(cb_called[i][1] == (i < 10)); + GPR_ASSERT(cb_called[i][0] == 0); } /* collect the rest of the alarms */ GPR_ASSERT(10 == - grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1500)))); + grpc_alarm_check( + NULL, gpr_time_add(start, gpr_time_from_millis(1500)), NULL)); for (i = 0; i < 30; i++) { - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20)); - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); + GPR_ASSERT(cb_called[i][1] == (i < 20)); + GPR_ASSERT(cb_called[i][0] == 0); } GPR_ASSERT(0 == - grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1600)))); + grpc_alarm_check( + NULL, gpr_time_add(start, gpr_time_from_millis(1600)), NULL)); for (i = 0; i < 30; i++) { - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20)); - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); - GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); + GPR_ASSERT(cb_called[i][1] == (i < 20)); + GPR_ASSERT(cb_called[i][0] == 0); } grpc_alarm_list_shutdown(); @@ -124,16 +124,16 @@ void destruction_test() { (void *)(gpr_intptr)3, gpr_time_0); grpc_alarm_init(&alarms[4], gpr_time_from_millis(1), cb, (void *)(gpr_intptr)4, gpr_time_0); - GPR_ASSERT(1 == grpc_alarm_check(gpr_time_from_millis(2))); - GPR_ASSERT(1 == cb_called[4][GRPC_CALLBACK_SUCCESS]); + GPR_ASSERT(1 == grpc_alarm_check(NULL, gpr_time_from_millis(2), NULL)); + GPR_ASSERT(1 == cb_called[4][1]); grpc_alarm_cancel(&alarms[0]); grpc_alarm_cancel(&alarms[3]); - GPR_ASSERT(1 == cb_called[0][GRPC_CALLBACK_CANCELLED]); - GPR_ASSERT(1 == cb_called[3][GRPC_CALLBACK_CANCELLED]); + GPR_ASSERT(1 == cb_called[0][0]); + GPR_ASSERT(1 == cb_called[3][0]); grpc_alarm_list_shutdown(); - GPR_ASSERT(1 == cb_called[1][GRPC_CALLBACK_CANCELLED]); - GPR_ASSERT(1 == cb_called[2][GRPC_CALLBACK_CANCELLED]); + GPR_ASSERT(1 == cb_called[1][0]); + GPR_ASSERT(1 == cb_called[2][0]); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c index 271c42d57e..247320de04 100644 --- a/test/core/iomgr/alarm_test.c +++ b/test/core/iomgr/alarm_test.c @@ -51,8 +51,10 @@ #include <grpc/support/time.h> #include "test/core/util/test_config.h" +#define SUCCESS_NOT_SET (-1) + /* Dummy gRPC callback */ -void no_op_cb(void *arg, grpc_iomgr_cb_status status) {} +void no_op_cb(void *arg, int success) {} typedef struct { gpr_cv cv; @@ -62,27 +64,25 @@ typedef struct { int done_cancel_ctr; int done; gpr_event fcb_arg; - grpc_iomgr_cb_status status; + int success; } alarm_arg; -static void followup_cb(void *arg, grpc_iomgr_cb_status status) { +static void followup_cb(void *arg, int success) { gpr_event_set((gpr_event *)arg, arg); } /* Called when an alarm expires. */ -static void alarm_cb(void *arg /* alarm_arg */, grpc_iomgr_cb_status status) { +static void alarm_cb(void *arg /* alarm_arg */, int success) { alarm_arg *a = arg; gpr_mu_lock(&a->mu); - if (status == GRPC_CALLBACK_SUCCESS) { + if (success) { a->counter++; a->done_success_ctr++; - } else if (status == GRPC_CALLBACK_CANCELLED) { - a->done_cancel_ctr++; } else { - GPR_ASSERT(0); + a->done_cancel_ctr++; } a->done = 1; - a->status = status; + a->success = success; gpr_cv_signal(&a->cv); gpr_mu_unlock(&a->mu); grpc_iomgr_add_callback(followup_cb, &a->fcb_arg); @@ -105,7 +105,7 @@ static void test_grpc_alarm() { grpc_iomgr_init(); arg.counter = 0; - arg.status = GRPC_CALLBACK_DO_NOT_USE; + arg.success = SUCCESS_NOT_SET; arg.done_success_ctr = 0; arg.done_cancel_ctr = 0; arg.done = 0; @@ -138,7 +138,7 @@ static void test_grpc_alarm() { } else if (arg.done_cancel_ctr != 0) { gpr_log(GPR_ERROR, "Alarm done callback called with cancel"); GPR_ASSERT(0); - } else if (arg.status == GRPC_CALLBACK_DO_NOT_USE) { + } else if (arg.success == SUCCESS_NOT_SET) { gpr_log(GPR_ERROR, "Alarm callback without status"); GPR_ASSERT(0); } else { @@ -154,7 +154,7 @@ static void test_grpc_alarm() { gpr_mu_destroy(&arg.mu); arg2.counter = 0; - arg2.status = GRPC_CALLBACK_DO_NOT_USE; + arg2.success = SUCCESS_NOT_SET; arg2.done_success_ctr = 0; arg2.done_cancel_ctr = 0; arg2.done = 0; @@ -188,7 +188,7 @@ static void test_grpc_alarm() { } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) { gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times"); GPR_ASSERT(0); - } else if (arg2.status == GRPC_CALLBACK_DO_NOT_USE) { + } else if (arg2.success == SUCCESS_NOT_SET) { gpr_log(GPR_ERROR, "Alarm callback without status"); GPR_ASSERT(0); } else if (arg2.done_success_ctr) { diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 6a7f6afbc6..125cde4678 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -145,8 +145,8 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices, gpr_cv_signal(&state->cv); gpr_mu_unlock(&state->mu); } else { - grpc_endpoint_notify_on_read( - state->read_ep, read_and_write_test_read_handler, data, gpr_inf_future); + grpc_endpoint_notify_on_read(state->read_ep, + read_and_write_test_read_handler, data); } } @@ -159,6 +159,8 @@ static void read_and_write_test_write_handler(void *data, GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); + gpr_log(GPR_DEBUG, "%s: error=%d", __FUNCTION__, error); + if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { gpr_log(GPR_INFO, "Write handler shutdown"); gpr_mu_lock(&state->mu); @@ -182,9 +184,10 @@ static void read_and_write_test_write_handler(void *data, slices = allocate_blocks(state->current_write_size, 8192, &nslices, &state->current_write_data); - write_status = grpc_endpoint_write(state->write_ep, slices, nslices, - read_and_write_test_write_handler, state, - gpr_inf_future); + write_status = + grpc_endpoint_write(state->write_ep, slices, nslices, + read_and_write_test_write_handler, state); + gpr_log(GPR_DEBUG, "write_status=%d", write_status); GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR); free(slices); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { @@ -208,8 +211,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, size_t num_bytes, size_t write_size, size_t slice_size, int shutdown) { struct read_and_write_test_state state; - gpr_timespec rel_deadline = {20, 0}; - gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + gpr_timespec deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(20)); grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); if (shutdown) { @@ -241,16 +243,22 @@ static void read_and_write_test(grpc_endpoint_test_config config, read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK); grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler, - &state, gpr_inf_future); + &state); if (shutdown) { + gpr_log(GPR_DEBUG, "shutdown read"); grpc_endpoint_shutdown(state.read_ep); + gpr_log(GPR_DEBUG, "shutdown write"); grpc_endpoint_shutdown(state.write_ep); } gpr_mu_lock(&state.mu); while (!state.read_done || !state.write_done) { - GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + if (gpr_cv_wait(&state.cv, &state.mu, deadline)) { + gpr_log(GPR_ERROR, "timeout: read_done=%d, write_done=%d", + state.read_done, state.write_done); + abort(); + } } gpr_mu_unlock(&state.mu); @@ -265,79 +273,6 @@ struct timeout_test_state { gpr_event io_done; }; -static void read_timeout_test_read_handler(void *data, gpr_slice *slices, - size_t nslices, - grpc_endpoint_cb_status error) { - struct timeout_test_state *state = data; - GPR_ASSERT(error == GRPC_ENDPOINT_CB_TIMED_OUT); - gpr_event_set(&state->io_done, (void *)1); -} - -static void read_timeout_test(grpc_endpoint_test_config config, - size_t slice_size) { - gpr_timespec timeout = gpr_time_from_micros(10000); - gpr_timespec read_deadline = gpr_time_add(gpr_now(), timeout); - gpr_timespec test_deadline = - gpr_time_add(gpr_now(), gpr_time_from_micros(2000000)); - struct timeout_test_state state; - grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); - - gpr_event_init(&state.io_done); - - grpc_endpoint_notify_on_read(f.client_ep, read_timeout_test_read_handler, - &state, read_deadline); - GPR_ASSERT(gpr_event_wait(&state.io_done, test_deadline)); - grpc_endpoint_destroy(f.client_ep); - grpc_endpoint_destroy(f.server_ep); - end_test(config); -} - -static void write_timeout_test_write_handler(void *data, - grpc_endpoint_cb_status error) { - struct timeout_test_state *state = data; - GPR_ASSERT(error == GRPC_ENDPOINT_CB_TIMED_OUT); - gpr_event_set(&state->io_done, (void *)1); -} - -static void write_timeout_test(grpc_endpoint_test_config config, - size_t slice_size) { - gpr_timespec timeout = gpr_time_from_micros(10000); - gpr_timespec write_deadline = gpr_time_add(gpr_now(), timeout); - gpr_timespec test_deadline = - gpr_time_add(gpr_now(), gpr_time_from_micros(2000000)); - struct timeout_test_state state; - int current_data = 1; - gpr_slice *slices; - size_t nblocks; - size_t size; - grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); - - gpr_event_init(&state.io_done); - - /* TODO(klempner): Factor this out with the equivalent code in tcp_test.c */ - for (size = 1;; size *= 2) { - slices = allocate_blocks(size, 1, &nblocks, ¤t_data); - switch (grpc_endpoint_write(f.client_ep, slices, nblocks, - write_timeout_test_write_handler, &state, - write_deadline)) { - case GRPC_ENDPOINT_WRITE_DONE: - break; - case GRPC_ENDPOINT_WRITE_ERROR: - gpr_log(GPR_ERROR, "error writing"); - abort(); - case GRPC_ENDPOINT_WRITE_PENDING: - GPR_ASSERT(gpr_event_wait(&state.io_done, test_deadline)); - gpr_free(slices); - goto exit; - } - gpr_free(slices); - } -exit: - grpc_endpoint_destroy(f.client_ep); - grpc_endpoint_destroy(f.server_ep); - end_test(config); -} - typedef struct { gpr_event ev; grpc_endpoint *ep; @@ -357,9 +292,8 @@ static void shutdown_during_write_test_read_handler( grpc_endpoint_destroy(st->ep); gpr_event_set(&st->ev, (void *)(gpr_intptr)error); } else { - grpc_endpoint_notify_on_read(st->ep, - shutdown_during_write_test_read_handler, - user_data, gpr_inf_future); + grpc_endpoint_notify_on_read( + st->ep, shutdown_during_write_test_read_handler, user_data); } } @@ -397,14 +331,13 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, gpr_event_init(&read_st.ev); gpr_event_init(&write_st.ev); - grpc_endpoint_notify_on_read(read_st.ep, - shutdown_during_write_test_read_handler, - &read_st, gpr_inf_future); + grpc_endpoint_notify_on_read( + read_st.ep, shutdown_during_write_test_read_handler, &read_st); for (size = 1;; size *= 2) { slices = allocate_blocks(size, 1, &nblocks, ¤t_data); switch (grpc_endpoint_write(write_st.ep, slices, nblocks, shutdown_during_write_test_write_handler, - &write_st, gpr_inf_future)) { + &write_st)) { case GRPC_ENDPOINT_WRITE_DONE: break; case GRPC_ENDPOINT_WRITE_ERROR: @@ -432,7 +365,5 @@ void grpc_endpoint_tests(grpc_endpoint_test_config config) { read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); - read_timeout_test(config, 1000); - write_timeout_test(config, 1000); shutdown_during_write_test(config, 1000); } diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index c3a0afdb25..325c9f0221 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -31,8 +31,7 @@ * */ -/* Test gRPC event manager with a simple TCP upload server and client. */ -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/fd_posix.h" #include <ctype.h> #include <errno.h> @@ -85,7 +84,7 @@ static void create_test_socket(int port, int *socket_fd, } /* Dummy gRPC callback */ -void no_op_cb(void *arg, enum grpc_em_cb_status status) {} +void no_op_cb(void *arg, int success) {} /* =======An upload server to test notify_on_read=========== The server simply reads and counts a stream of bytes. */ @@ -117,10 +116,10 @@ typedef struct { /* Called when an upload session can be safely shutdown. Close session FD and start to shutdown listen FD. */ static void session_shutdown_cb(void *arg, /*session*/ - enum grpc_em_cb_status status) { + int success) { session *se = arg; server *sv = se->sv; - grpc_fd_destroy(se->em_fd, NULL, NULL); + grpc_fd_orphan(se->em_fd, NULL, NULL); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(sv->em_fd); @@ -128,15 +127,15 @@ static void session_shutdown_cb(void *arg, /*session*/ /* Called when data become readable in a session. */ static void session_read_cb(void *arg, /*session*/ - enum grpc_em_cb_status status) { + int success) { session *se = arg; - int fd = grpc_fd_get(se->em_fd); + int fd = se->em_fd->fd; ssize_t read_once = 0; ssize_t read_total = 0; - if (status == GRPC_CALLBACK_CANCELLED) { - session_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); + if (!success) { + session_shutdown_cb(arg, 1); return; } @@ -151,8 +150,7 @@ static void session_read_cb(void *arg, /*session*/ It is possible to read nothing due to spurious edge event or data has been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ if (read_once == 0) { - grpc_fd_shutdown(se->em_fd); - grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future); + session_shutdown_cb(arg, 1); } else if (read_once == -1) { if (errno == EAGAIN) { /* An edge triggered event is cached in the kernel until next poll. @@ -163,8 +161,7 @@ static void session_read_cb(void *arg, /*session*/ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - GPR_ASSERT(grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, - gpr_inf_future)); + grpc_fd_notify_on_read(se->em_fd, session_read_cb, se); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); GPR_ASSERT(0); @@ -174,11 +171,10 @@ static void session_read_cb(void *arg, /*session*/ /* Called when the listen FD can be safely shutdown. Close listen FD and signal that server can be shutdown. */ -static void listen_shutdown_cb(void *arg /*server*/, - enum grpc_em_cb_status status) { +static void listen_shutdown_cb(void *arg /*server*/, int success) { server *sv = arg; - grpc_fd_destroy(sv->em_fd, NULL, NULL); + grpc_fd_orphan(sv->em_fd, NULL, NULL); gpr_mu_lock(&sv->mu); sv->done = 1; @@ -188,21 +184,21 @@ static void listen_shutdown_cb(void *arg /*server*/, /* Called when a new TCP connection request arrives in the listening port. */ static void listen_cb(void *arg, /*=sv_arg*/ - enum grpc_em_cb_status status) { + int success) { server *sv = arg; int fd; int flags; session *se; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); - struct grpc_fd *listen_em_fd = sv->em_fd; + grpc_fd *listen_em_fd = sv->em_fd; - if (status == GRPC_CALLBACK_CANCELLED) { - listen_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); + if (!success) { + listen_shutdown_cb(arg, 1); return; } - fd = accept(grpc_fd_get(listen_em_fd), (struct sockaddr *)&ss, &slen); + fd = accept(listen_em_fd->fd, (struct sockaddr *)&ss, &slen); GPR_ASSERT(fd >= 0); GPR_ASSERT(fd < FD_SETSIZE); flags = fcntl(fd, F_GETFL, 0); @@ -210,11 +206,9 @@ static void listen_cb(void *arg, /*=sv_arg*/ se = gpr_malloc(sizeof(*se)); se->sv = sv; se->em_fd = grpc_fd_create(fd); - GPR_ASSERT( - grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future)); + grpc_fd_notify_on_read(se->em_fd, session_read_cb, se); - GPR_ASSERT( - grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv, gpr_inf_future)); + grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv); } /* Max number of connections pending to be accepted by listen(). */ @@ -239,7 +233,7 @@ static int server_start(server *sv) { sv->em_fd = grpc_fd_create(fd); /* Register to be interested in reading from listen_fd. */ - GPR_ASSERT(grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv, gpr_inf_future)); + grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv); return port; } @@ -285,25 +279,24 @@ static void client_init(client *cl) { } /* Called when a client upload session is ready to shutdown. */ -static void client_session_shutdown_cb(void *arg /*client*/, - enum grpc_em_cb_status status) { +static void client_session_shutdown_cb(void *arg /*client*/, int success) { client *cl = arg; - grpc_fd_destroy(cl->em_fd, NULL, NULL); - gpr_mu_lock(&cl->mu); + grpc_fd_orphan(cl->em_fd, NULL, NULL); cl->done = 1; gpr_cv_signal(&cl->done_cv); - gpr_mu_unlock(&cl->mu); } /* Write as much as possible, then register notify_on_write. */ static void client_session_write(void *arg, /*client*/ - enum grpc_em_cb_status status) { + int success) { client *cl = arg; - int fd = grpc_fd_get(cl->em_fd); + int fd = cl->em_fd->fd; ssize_t write_once = 0; - if (status == GRPC_CALLBACK_CANCELLED) { - client_session_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); + if (!success) { + gpr_mu_lock(&cl->mu); + client_session_shutdown_cb(arg, 1); + gpr_mu_unlock(&cl->mu); return; } @@ -315,14 +308,10 @@ static void client_session_write(void *arg, /*client*/ if (errno == EAGAIN) { gpr_mu_lock(&cl->mu); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { - GPR_ASSERT(grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl, - gpr_inf_future)); + grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl); cl->client_write_cnt++; } else { - close(fd); - grpc_fd_shutdown(cl->em_fd); - grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl, - gpr_inf_future); + client_session_shutdown_cb(arg, 1); } gpr_mu_unlock(&cl->mu); } else { @@ -344,7 +333,7 @@ static void client_start(client *cl, int port) { cl->em_fd = grpc_fd_create(fd); - client_session_write(cl, GRPC_CALLBACK_SUCCESS); + client_session_write(cl, 1); } /* Wait for the signal to shutdown a client. */ @@ -378,7 +367,7 @@ static void test_grpc_fd() { typedef struct fd_change_data { gpr_mu mu; gpr_cv cv; - void (*cb_that_ran)(void *, enum grpc_em_cb_status); + void (*cb_that_ran)(void *, int success); } fd_change_data; void init_change_data(fd_change_data *fdc) { @@ -392,8 +381,7 @@ void destroy_change_data(fd_change_data *fdc) { gpr_cv_destroy(&fdc->cv); } -static void first_read_callback(void *arg /* fd_change_data */, - enum grpc_em_cb_status status) { +static void first_read_callback(void *arg /* fd_change_data */, int success) { fd_change_data *fdc = arg; gpr_mu_lock(&fdc->mu); @@ -402,8 +390,7 @@ static void first_read_callback(void *arg /* fd_change_data */, gpr_mu_unlock(&fdc->mu); } -static void second_read_callback(void *arg /* fd_change_data */, - enum grpc_em_cb_status status) { +static void second_read_callback(void *arg /* fd_change_data */, int success) { fd_change_data *fdc = arg; gpr_mu_lock(&fdc->mu); @@ -436,7 +423,7 @@ static void test_grpc_fd_change() { em_fd = grpc_fd_create(sv[0]); /* Register the first callback, then make its FD readable */ - grpc_fd_notify_on_read(em_fd, first_read_callback, &a, gpr_inf_future); + grpc_fd_notify_on_read(em_fd, first_read_callback, &a); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -455,7 +442,7 @@ static void test_grpc_fd_change() { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_fd_notify_on_read(em_fd, second_read_callback, &b, gpr_inf_future); + grpc_fd_notify_on_read(em_fd, second_read_callback, &b); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -468,48 +455,9 @@ static void test_grpc_fd_change() { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(&b.mu); - grpc_fd_destroy(em_fd, NULL, NULL); + grpc_fd_orphan(em_fd, NULL, NULL); destroy_change_data(&a); destroy_change_data(&b); - close(sv[0]); - close(sv[1]); -} - -void timeout_callback(void *arg, enum grpc_em_cb_status status) { - if (status == GRPC_CALLBACK_TIMED_OUT) { - gpr_event_set(arg, (void *)1); - } else { - gpr_event_set(arg, (void *)2); - } -} - -void test_grpc_fd_notify_timeout() { - grpc_fd *em_fd; - gpr_event ev; - int flags; - int sv[2]; - gpr_timespec timeout; - gpr_timespec deadline; - - gpr_event_init(&ev); - - GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); - flags = fcntl(sv[0], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); - flags = fcntl(sv[1], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); - - em_fd = grpc_fd_create(sv[0]); - - timeout = gpr_time_from_micros(1000000); - deadline = gpr_time_add(gpr_now(), timeout); - - grpc_fd_notify_on_read(em_fd, timeout_callback, &ev, deadline); - - GPR_ASSERT(gpr_event_wait(&ev, gpr_time_add(deadline, timeout))); - - GPR_ASSERT(gpr_event_get(&ev) == (void *)1); - grpc_fd_destroy(em_fd, NULL, NULL); close(sv[1]); } @@ -518,7 +466,6 @@ int main(int argc, char **argv) { grpc_iomgr_init(); test_grpc_fd(); test_grpc_fd_change(); - test_grpc_fd_notify_timeout(); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index cb1cd0bc16..2d0a89a1f5 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -44,7 +44,7 @@ #include <grpc/support/time.h> static gpr_timespec test_deadline() { - return gpr_time_add(gpr_now(), gpr_time_from_micros(1000000)); + return gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); } static void must_succeed(void *arg, grpc_endpoint *tcp) { diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 6a4ef0f984..7fd2567cec 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -154,7 +154,7 @@ static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, if (state->read_bytes >= state->target_read_bytes) { gpr_cv_signal(&state->cv); } else { - grpc_endpoint_notify_on_read(state->ep, read_cb, state, gpr_inf_future); + grpc_endpoint_notify_on_read(state->ep, read_cb, state); } gpr_mu_unlock(&state->mu); } @@ -183,7 +183,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { state.read_bytes = 0; state.target_read_bytes = written_bytes; - grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future); + grpc_endpoint_notify_on_read(ep, read_cb, &state); gpr_mu_lock(&state.mu); for (;;) { @@ -225,7 +225,7 @@ static void large_read_test(ssize_t slice_size) { state.read_bytes = 0; state.target_read_bytes = written_bytes; - grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future); + grpc_endpoint_notify_on_read(ep, read_cb, &state); gpr_mu_lock(&state.mu); for (;;) { @@ -363,8 +363,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state, - gpr_inf_future) == GRPC_ENDPOINT_WRITE_DONE) { + if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) == + GRPC_ENDPOINT_WRITE_DONE) { /* Write completed immediately */ read_bytes = drain_socket(sv[0]); GPR_ASSERT(read_bytes == num_bytes); @@ -421,15 +421,13 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state, - gpr_inf_future)) { + switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) { case GRPC_ENDPOINT_WRITE_DONE: case GRPC_ENDPOINT_WRITE_ERROR: /* Write completed immediately */ break; case GRPC_ENDPOINT_WRITE_PENDING: - grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL, - gpr_inf_future); + grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL); gpr_mu_lock(&state.mu); for (;;) { if (state.write_done) { diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index cb77a88062..f30ff917cb 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -66,7 +66,7 @@ static void test_no_op() { static void test_no_op_with_start() { grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST(); - grpc_tcp_server_start(s, on_connect, NULL); + grpc_tcp_server_start(s, NULL, on_connect, NULL); grpc_tcp_server_destroy(s); } @@ -93,7 +93,7 @@ static void test_no_op_with_port_and_start() { GPR_ASSERT( grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); - grpc_tcp_server_start(s, on_connect, NULL); + grpc_tcp_server_start(s, NULL, on_connect, NULL); grpc_tcp_server_destroy(s); } @@ -120,7 +120,7 @@ static void test_connect(int n) { GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); GPR_ASSERT(addr_len <= sizeof(addr)); - grpc_tcp_server_start(s, on_connect, NULL); + grpc_tcp_server_start(s, NULL, on_connect, NULL); for (i = 0; i < n; i++) { deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000)); diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 9311d6ba11..d4baa64725 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -153,8 +153,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { int verified = 0; gpr_log(GPR_INFO, "Start test left over"); - grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified, - gpr_inf_future); + grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified); GPR_ASSERT(verified == 1); grpc_endpoint_shutdown(f.client_ep); @@ -187,7 +186,7 @@ static void test_destroy_ep_early(grpc_endpoint_test_config config, grpc_endpoint_test_fixture f = config.create_fixture(slice_size); gpr_log(GPR_INFO, "Start test destroy early"); - grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f, gpr_inf_future); + grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f); grpc_endpoint_shutdown(f.server_ep); grpc_endpoint_destroy(f.server_ep); diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj index ffce5f6ecd..6cc5a8b293 100644 --- a/vsprojects/vs2013/grpc.vcxproj +++ b/vsprojects/vs2013/grpc.vcxproj @@ -115,10 +115,12 @@ <ClInclude Include="..\..\src\core\iomgr\alarm_internal.h" /> <ClInclude Include="..\..\src\core\iomgr\endpoint.h" /> <ClInclude Include="..\..\src\core\iomgr\endpoint_pair.h" /> - <ClInclude Include="..\..\src\core\iomgr\iomgr_completion_queue_interface.h" /> + <ClInclude Include="..\..\src\core\iomgr\fd_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\iomgr.h" /> - <ClInclude Include="..\..\src\core\iomgr\iomgr_libevent.h" /> + <ClInclude Include="..\..\src\core\iomgr\iomgr_internal.h" /> + <ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset.h" /> + <ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\resolve_address.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr_posix.h" /> @@ -143,9 +145,9 @@ <ClInclude Include="..\..\src\core\surface\server.h" /> <ClInclude Include="..\..\src\core\surface\surface_trace.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h" /> + <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_goaway.h" /> - <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_ping.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_rst_stream.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_settings.h" /> @@ -158,8 +160,8 @@ <ClInclude Include="..\..\src\core\transport\chttp2\stream_encoder.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\stream_map.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\timeout_encoding.h" /> - <ClInclude Include="..\..\src\core\transport\chttp2_transport.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" /> + <ClInclude Include="..\..\src\core\transport\chttp2_transport.h" /> <ClInclude Include="..\..\src\core\transport\metadata.h" /> <ClInclude Include="..\..\src\core\transport\stream_op.h" /> <ClInclude Include="..\..\src\core\transport\transport.h" /> @@ -236,11 +238,15 @@ </ClCompile> <ClCompile Include="..\..\src\core\iomgr\endpoint_pair_posix.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent.c"> + <ClCompile Include="..\..\src\core\iomgr\fd_posix.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\iomgr.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent_use_threads.c"> + <ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset.c"> + <ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\pollset_posix.c"> </ClCompile> <ClCompile Include="..\..\src\core\iomgr\resolve_address_posix.c"> </ClCompile> @@ -332,10 +338,10 @@ </ClCompile> <ClCompile Include="..\..\src\core\transport\chttp2\timeout_encoding.c"> </ClCompile> - <ClCompile Include="..\..\src\core\transport\chttp2_transport.c"> - </ClCompile> <ClCompile Include="..\..\src\core\transport\chttp2\varint.c"> </ClCompile> + <ClCompile Include="..\..\src\core\transport\chttp2_transport.c"> + </ClCompile> <ClCompile Include="..\..\src\core\transport\metadata.c"> </ClCompile> <ClCompile Include="..\..\src\core\transport\stream_op.c"> diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj index ffce5f6ecd..6cc5a8b293 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj @@ -115,10 +115,12 @@ <ClInclude Include="..\..\src\core\iomgr\alarm_internal.h" /> <ClInclude Include="..\..\src\core\iomgr\endpoint.h" /> <ClInclude Include="..\..\src\core\iomgr\endpoint_pair.h" /> - <ClInclude Include="..\..\src\core\iomgr\iomgr_completion_queue_interface.h" /> + <ClInclude Include="..\..\src\core\iomgr\fd_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\iomgr.h" /> - <ClInclude Include="..\..\src\core\iomgr\iomgr_libevent.h" /> + <ClInclude Include="..\..\src\core\iomgr\iomgr_internal.h" /> + <ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset.h" /> + <ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\resolve_address.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr_posix.h" /> @@ -143,9 +145,9 @@ <ClInclude Include="..\..\src\core\surface\server.h" /> <ClInclude Include="..\..\src\core\surface\surface_trace.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h" /> + <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_goaway.h" /> - <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_ping.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_rst_stream.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_settings.h" /> @@ -158,8 +160,8 @@ <ClInclude Include="..\..\src\core\transport\chttp2\stream_encoder.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\stream_map.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\timeout_encoding.h" /> - <ClInclude Include="..\..\src\core\transport\chttp2_transport.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" /> + <ClInclude Include="..\..\src\core\transport\chttp2_transport.h" /> <ClInclude Include="..\..\src\core\transport\metadata.h" /> <ClInclude Include="..\..\src\core\transport\stream_op.h" /> <ClInclude Include="..\..\src\core\transport\transport.h" /> @@ -236,11 +238,15 @@ </ClCompile> <ClCompile Include="..\..\src\core\iomgr\endpoint_pair_posix.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent.c"> + <ClCompile Include="..\..\src\core\iomgr\fd_posix.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\iomgr.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent_use_threads.c"> + <ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset.c"> + <ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\pollset_posix.c"> </ClCompile> <ClCompile Include="..\..\src\core\iomgr\resolve_address_posix.c"> </ClCompile> @@ -332,10 +338,10 @@ </ClCompile> <ClCompile Include="..\..\src\core\transport\chttp2\timeout_encoding.c"> </ClCompile> - <ClCompile Include="..\..\src\core\transport\chttp2_transport.c"> - </ClCompile> <ClCompile Include="..\..\src\core\transport\chttp2\varint.c"> </ClCompile> + <ClCompile Include="..\..\src\core\transport\chttp2_transport.c"> + </ClCompile> <ClCompile Include="..\..\src\core\transport\metadata.c"> </ClCompile> <ClCompile Include="..\..\src\core\transport\stream_op.c"> |