diff options
author | ctiller <ctiller@google.com> | 2015-01-07 12:13:17 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2015-01-09 17:23:18 -0800 |
commit | e4b409364e4c493a66d4b2a6fe897075aa5c174e (patch) | |
tree | 29467626f50aea49e072e15004dd141625146709 | |
parent | 8232204a36712553b9eedb2dacab13b7c38642c6 (diff) |
Add a --forever flag, to continuously run tests as things change.
Change on 2015/01/07 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83451760
84 files changed, 1775 insertions, 2333 deletions
@@ -84,7 +84,7 @@ CPPFLAGS += -g -fPIC -Wall -Werror -Wno-long-long LDFLAGS += -g -pthread -fPIC INCLUDES = . include gens -LIBS = rt m z pthread +LIBS = rt m z event event_pthreads pthread LIBSXX = protobuf LIBS_PROTOC = protoc protobuf @@ -143,12 +143,20 @@ else IS_GIT_FOLDER = true endif +EVENT2_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/event2.c -levent $(LDFLAGS) OPENSSL_ALPN_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/openssl-alpn.c -lssl -lcrypto -ldl $(LDFLAGS) ZLIB_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/zlib.c -lz $(LDFLAGS) +HAS_SYSTEM_EVENT2 = $(shell $(EVENT2_CHECK_CMD) 2> /dev/null && echo true || echo false) HAS_SYSTEM_OPENSSL_ALPN = $(shell $(OPENSSL_ALPN_CHECK_CMD) 2> /dev/null && echo true || echo false) HAS_SYSTEM_ZLIB = $(shell $(ZLIB_CHECK_CMD) 2> /dev/null && echo true || echo false) +ifeq ($(wildcard third_party/libevent/include/event2/event.h),) +HAS_EMBEDDED_EVENT2 = false +else +HAS_EMBEDDED_EVENT2 = true +endif + ifeq ($(wildcard third_party/openssl/ssl/ssl.h),) HAS_EMBEDDED_OPENSSL_ALPN = false else @@ -161,6 +169,12 @@ else HAS_EMBEDDED_ZLIB = true endif +ifneq ($(SYSTEM),MINGW32) +ifeq ($(HAS_SYSTEM_EVENT2),false) +DEP_MISSING += libevent +endif +endif + ifeq ($(HAS_SYSTEM_ZLIB),false) ifeq ($(HAS_EMBEDDED_ZLIB),true) ZLIB_DEP = third_party/zlib/libz.a @@ -452,6 +466,7 @@ chttp2_socket_pair_one_byte_at_a_time_thread_stress_test: bins/$(CONFIG)/chttp2_ chttp2_socket_pair_one_byte_at_a_time_writes_done_hangs_with_pending_read_test: bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_writes_done_hangs_with_pending_read_test run_dep_checks: + $(EVENT2_CHECK_CMD) || true $(OPENSSL_ALPN_CHECK_CMD) || true $(ZLIB_CHECK_CMD) || true @@ -1214,11 +1229,9 @@ LIBGRPC_SRC = \ src/core/iomgr/alarm_heap.c \ src/core/iomgr/endpoint.c \ src/core/iomgr/endpoint_pair_posix.c \ - src/core/iomgr/fd_posix.c \ - src/core/iomgr/iomgr.c \ - src/core/iomgr/iomgr_posix.c \ - src/core/iomgr/pollset_multipoller_with_poll_posix.c \ - src/core/iomgr/pollset_posix.c \ + src/core/iomgr/iomgr_libevent.c \ + src/core/iomgr/iomgr_libevent_use_threads.c \ + src/core/iomgr/pollset.c \ src/core/iomgr/resolve_address_posix.c \ src/core/iomgr/sockaddr_utils.c \ src/core/iomgr/socket_utils_common_posix.c \ @@ -1264,8 +1277,8 @@ LIBGRPC_SRC = \ src/core/transport/chttp2/stream_encoder.c \ src/core/transport/chttp2/stream_map.c \ src/core/transport/chttp2/timeout_encoding.c \ - src/core/transport/chttp2/varint.c \ src/core/transport/chttp2_transport.c \ + src/core/transport/chttp2/varint.c \ src/core/transport/metadata.c \ src/core/transport/stream_op.c \ src/core/transport/transport.c \ @@ -1366,11 +1379,9 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/alarm_heap.c \ src/core/iomgr/endpoint.c \ src/core/iomgr/endpoint_pair_posix.c \ - src/core/iomgr/fd_posix.c \ - src/core/iomgr/iomgr.c \ - src/core/iomgr/iomgr_posix.c \ - src/core/iomgr/pollset_multipoller_with_poll_posix.c \ - src/core/iomgr/pollset_posix.c \ + src/core/iomgr/iomgr_libevent.c \ + src/core/iomgr/iomgr_libevent_use_threads.c \ + src/core/iomgr/pollset.c \ src/core/iomgr/resolve_address_posix.c \ src/core/iomgr/sockaddr_utils.c \ src/core/iomgr/socket_utils_common_posix.c \ @@ -1416,8 +1427,8 @@ LIBGRPC_UNSECURE_SRC = \ src/core/transport/chttp2/stream_encoder.c \ src/core/transport/chttp2/stream_map.c \ src/core/transport/chttp2/timeout_encoding.c \ - src/core/transport/chttp2/varint.c \ src/core/transport/chttp2_transport.c \ + src/core/transport/chttp2/varint.c \ src/core/transport/metadata.c \ src/core/transport/stream_op.c \ src/core/transport/transport.c \ diff --git a/build.json b/build.json index e0e05c8eaf..e0bf280635 100644 --- a/build.json +++ b/build.json @@ -35,11 +35,9 @@ "src/core/iomgr/alarm_heap.c", "src/core/iomgr/endpoint.c", "src/core/iomgr/endpoint_pair_posix.c", - "src/core/iomgr/fd_posix.c", - "src/core/iomgr/iomgr.c", - "src/core/iomgr/iomgr_posix.c", - "src/core/iomgr/pollset_multipoller_with_poll_posix.c", - "src/core/iomgr/pollset_posix.c", + "src/core/iomgr/iomgr_libevent.c", + "src/core/iomgr/iomgr_libevent_use_threads.c", + "src/core/iomgr/pollset.c", "src/core/iomgr/resolve_address_posix.c", "src/core/iomgr/sockaddr_utils.c", "src/core/iomgr/socket_utils_common_posix.c", @@ -85,8 +83,8 @@ "src/core/transport/chttp2/stream_encoder.c", "src/core/transport/chttp2/stream_map.c", "src/core/transport/chttp2/timeout_encoding.c", - "src/core/transport/chttp2/varint.c", "src/core/transport/chttp2_transport.c", + "src/core/transport/chttp2/varint.c", "src/core/transport/metadata.c", "src/core/transport/stream_op.c", "src/core/transport/transport.c", @@ -122,12 +120,10 @@ "src/core/iomgr/alarm_internal.h", "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", - "src/core/iomgr/fd_posix.h", + "src/core/iomgr/iomgr_completion_queue_interface.h", "src/core/iomgr/iomgr.h", - "src/core/iomgr/iomgr_internal.h", - "src/core/iomgr/iomgr_posix.h", + "src/core/iomgr/iomgr_libevent.h", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/resolve_address.h", "src/core/iomgr/sockaddr.h", "src/core/iomgr/sockaddr_posix.h", @@ -152,9 +148,9 @@ "src/core/surface/server.h", "src/core/surface/surface_trace.h", "src/core/transport/chttp2/bin_encoder.h", - "src/core/transport/chttp2/frame.h", "src/core/transport/chttp2/frame_data.h", "src/core/transport/chttp2/frame_goaway.h", + "src/core/transport/chttp2/frame.h", "src/core/transport/chttp2/frame_ping.h", "src/core/transport/chttp2/frame_rst_stream.h", "src/core/transport/chttp2/frame_settings.h", @@ -167,8 +163,8 @@ "src/core/transport/chttp2/stream_encoder.h", "src/core/transport/chttp2/stream_map.h", "src/core/transport/chttp2/timeout_encoding.h", - "src/core/transport/chttp2/varint.h", "src/core/transport/chttp2_transport.h", + "src/core/transport/chttp2/varint.h", "src/core/transport/metadata.h", "src/core/transport/stream_op.h", "src/core/transport/transport.h", diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index 27a7b5529f..9e5c9ff2ac 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -54,7 +54,6 @@ #define GPR_CPU_LINUX 1 #define GPR_GCC_SYNC 1 #define GPR_LIBEVENT 1 -#define GPR_POSIX_MULTIPOLL_WITH_POLL 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #define GPR_POSIX_SOCKETUTILS 1 @@ -66,7 +65,6 @@ #define GPR_GCC_ATOMIC 1 #define GPR_LIBEVENT 1 #define GPR_LINUX 1 -#define GPR_POSIX_MULTIPOLL_WITH_POLL 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #define GPR_POSIX_STRING 1 @@ -82,7 +80,6 @@ #define GPR_GCC_ATOMIC 1 #define GPR_LIBEVENT 1 #define GPR_POSIX_LOG 1 -#define GPR_POSIX_MULTIPOLL_WITH_POLL 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #define GPR_POSIX_SOCKETUTILS 1 diff --git a/include/grpc/support/time.h b/include/grpc/support/time.h index 41d1e88dc9..5a57d94768 100644 --- a/include/grpc/support/time.h +++ b/include/grpc/support/time.h @@ -94,8 +94,6 @@ gpr_timespec gpr_time_from_seconds(long x); gpr_timespec gpr_time_from_minutes(long x); gpr_timespec gpr_time_from_hours(long x); -gpr_int32 gpr_time_to_millis(gpr_timespec timespec); - /* Return 1 if two times are equal or within threshold of each other, 0 otherwise */ int gpr_time_similar(gpr_timespec a, gpr_timespec b, gpr_timespec threshold); diff --git a/src/compiler/ruby_generator.cc b/src/compiler/ruby_generator.cc index 20485b47a5..c5c5884736 100644 --- a/src/compiler/ruby_generator.cc +++ b/src/compiler/ruby_generator.cc @@ -104,11 +104,6 @@ void PrintService(const ServiceDescriptor* service, const string& package, out->Print("\n"); out->Print("self.marshal_class_method = :encode\n"); out->Print("self.unmarshal_class_method = :decode\n"); - map<string, string> pkg_vars = ListToDict({ - "service.name", service->name(), - "pkg.name", package, - }); - out->Print(pkg_vars, "self.service_name = '$pkg.name$.$service.name$'\n"); out->Print("\n"); for (int i = 0; i < service->method_count(); ++i) { PrintMethod(service->method(i), package, out); diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index 3778f4fb88..e67b823697 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -85,19 +85,19 @@ static void lb_channel_op(grpc_channel_element *elem, grpc_channel_op *op) { lb_channel_data *chand = elem->channel_data; grpc_channel_element *back; - int calling_back = 0; switch (op->dir) { case GRPC_CALL_UP: gpr_mu_lock(&chand->mu); back = chand->back; - if (back) { - chand->calling_back++; - calling_back = 1; - } + if (back) chand->calling_back++; gpr_mu_unlock(&chand->mu); if (back) { back->filter->channel_op(chand->back, elem, op); + gpr_mu_lock(&chand->mu); + chand->calling_back--; + gpr_cv_broadcast(&chand->cv); + gpr_mu_unlock(&chand->mu); } else if (op->type == GRPC_TRANSPORT_GOAWAY) { gpr_slice_unref(op->data.goaway.message); } @@ -107,27 +107,23 @@ static void lb_channel_op(grpc_channel_element *elem, break; } - gpr_mu_lock(&chand->mu); switch (op->type) { case GRPC_TRANSPORT_CLOSED: + gpr_mu_lock(&chand->mu); chand->disconnected = 1; maybe_destroy_channel(grpc_channel_stack_from_top_element(elem)); + gpr_mu_unlock(&chand->mu); break; case GRPC_CHANNEL_GOAWAY: + gpr_mu_lock(&chand->mu); chand->sent_goaway = 1; + gpr_mu_unlock(&chand->mu); break; case GRPC_CHANNEL_DISCONNECT: case GRPC_TRANSPORT_GOAWAY: case GRPC_ACCEPT_CALL: break; } - - if (calling_back) { - chand->calling_back--; - gpr_cv_signal(&chand->cv); - maybe_destroy_channel(grpc_channel_stack_from_top_element(elem)); - } - gpr_mu_unlock(&chand->mu); } /* Constructor for call_data */ @@ -181,9 +177,7 @@ const grpc_channel_filter grpc_child_channel_top_filter = { #define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0) -static void finally_destroy_channel(void *c, int success) { - /* ignore success or not... this is a destruction callback and will only - happen once - the only purpose here is to release resources */ +static void finally_destroy_channel(void *c, grpc_iomgr_cb_status status) { grpc_child_channel *channel = c; lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data; /* wait for the initiator to leave the mutex */ @@ -193,7 +187,7 @@ static void finally_destroy_channel(void *c, int success) { gpr_free(channel); } -static void send_farewells(void *c, int success) { +static void send_farewells(void *c, grpc_iomgr_cb_status status) { grpc_child_channel *channel = c; grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel); lb_channel_data *chand = lbelem->channel_data; @@ -227,7 +221,7 @@ static void send_farewells(void *c, int success) { static void maybe_destroy_channel(grpc_child_channel *channel) { lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data; if (chand->destroyed && chand->disconnected && chand->active_calls == 0 && - !chand->sending_farewell && !chand->calling_back) { + !chand->sending_farewell) { grpc_iomgr_add_callback(finally_destroy_channel, channel); } else if (chand->destroyed && !chand->disconnected && chand->active_calls == 0 && !chand->sending_farewell && @@ -255,16 +249,14 @@ grpc_child_channel *grpc_child_channel_create( return stk; } -void grpc_child_channel_destroy(grpc_child_channel *channel, - int wait_for_callbacks) { +void grpc_child_channel_destroy(grpc_child_channel *channel) { grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel); lb_channel_data *chand = lbelem->channel_data; gpr_mu_lock(&chand->mu); - while (wait_for_callbacks && chand->calling_back) { + while (chand->calling_back) { gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future); } - chand->back = NULL; chand->destroyed = 1; maybe_destroy_channel(channel); diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h index 3ba4c1b8a9..9fb2a17e29 100644 --- a/src/core/channel/child_channel.h +++ b/src/core/channel/child_channel.h @@ -53,8 +53,7 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel, grpc_channel_op *op); grpc_channel_element *grpc_child_channel_get_bottom_element( grpc_child_channel *channel); -void grpc_child_channel_destroy(grpc_child_channel *channel, - int wait_for_callbacks); +void grpc_child_channel_destroy(grpc_child_channel *channel); grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, grpc_call_element *parent); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 46283835a0..fd883a08ca 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -294,6 +294,14 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, } } +static void finally_destroy_channel(void *arg, grpc_iomgr_cb_status status) { + grpc_child_channel_destroy(arg); +} + +static void destroy_channel_later(grpc_child_channel *channel) { + grpc_iomgr_add_callback(finally_destroy_channel, channel); +} + static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { channel_data *chand = elem->channel_data; @@ -309,7 +317,7 @@ static void channel_op(grpc_channel_element *elem, gpr_mu_unlock(&chand->mu); if (child_channel) { grpc_child_channel_handle_op(child_channel, op); - grpc_child_channel_destroy(child_channel, 1); + destroy_channel_later(child_channel); } else { gpr_slice_unref(op->data.goaway.message); } @@ -321,7 +329,7 @@ static void channel_op(grpc_channel_element *elem, chand->active_child = NULL; gpr_mu_unlock(&chand->mu); if (child_channel) { - grpc_child_channel_destroy(child_channel, 1); + destroy_channel_later(child_channel); } break; case GRPC_TRANSPORT_GOAWAY: @@ -336,7 +344,7 @@ static void channel_op(grpc_channel_element *elem, } gpr_mu_unlock(&chand->mu); if (child_channel) { - grpc_child_channel_destroy(child_channel, 0); + destroy_channel_later(child_channel); } gpr_slice_unref(op->data.goaway.message); break; @@ -352,7 +360,7 @@ static void channel_op(grpc_channel_element *elem, } gpr_mu_unlock(&chand->mu); if (child_channel) { - grpc_child_channel_destroy(child_channel, 0); + destroy_channel_later(child_channel); } break; default: @@ -437,7 +445,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_transport_setup_cancel(chand->transport_setup); if (chand->active_child) { - grpc_child_channel_destroy(chand->active_child, 1); + grpc_child_channel_destroy(chand->active_child); chand->active_child = NULL; } @@ -541,7 +549,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( gpr_free(child_filters); if (old_active) { - grpc_child_channel_destroy(old_active, 1); + grpc_child_channel_destroy(old_active); } return result; diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index ebaf816902..b1194e278d 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -166,7 +166,8 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) { return result; } -static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) { +static void backoff_alarm_done(void *arg /* grpc_client_setup */, + grpc_iomgr_cb_status status) { grpc_client_setup *s = arg; grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request)); r->setup = s; @@ -176,7 +177,7 @@ static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) { gpr_mu_lock(&s->mu); s->active_request = r; s->in_alarm = 0; - if (!success) { + if (status != GRPC_CALLBACK_SUCCESS) { if (0 == --s->refs) { gpr_mu_unlock(&s->mu); destroy_setup(s); diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 2143eeb63d..06d73e40f5 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -101,11 +101,12 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, switch (status) { case GRPC_ENDPOINT_CB_OK: - grpc_endpoint_notify_on_read(req->ep, on_read, req); + grpc_endpoint_notify_on_read(req->ep, on_read, req, gpr_inf_future); break; case GRPC_ENDPOINT_CB_EOF: case GRPC_ENDPOINT_CB_ERROR: case GRPC_ENDPOINT_CB_SHUTDOWN: + case GRPC_ENDPOINT_CB_TIMED_OUT: if (!req->have_read_byte) { next_address(req); } else { @@ -122,7 +123,7 @@ done: static void on_written(internal_request *req) { gpr_log(GPR_DEBUG, "%s", __FUNCTION__); - grpc_endpoint_notify_on_read(req->ep, on_read, req); + grpc_endpoint_notify_on_read(req->ep, on_read, req, gpr_inf_future); } static void done_write(void *arg, grpc_endpoint_cb_status status) { @@ -135,6 +136,7 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) { case GRPC_ENDPOINT_CB_EOF: case GRPC_ENDPOINT_CB_SHUTDOWN: case GRPC_ENDPOINT_CB_ERROR: + case GRPC_ENDPOINT_CB_TIMED_OUT: next_address(req); break; } @@ -143,8 +145,8 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) { static void start_write(internal_request *req) { gpr_slice_ref(req->request_text); gpr_log(GPR_DEBUG, "%s", __FUNCTION__); - switch ( - grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) { + switch (grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req, + gpr_inf_future)) { case GRPC_ENDPOINT_WRITE_DONE: on_written(req); break; diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 2664879323..b7238f716a 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -71,8 +71,8 @@ static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, - gpr_timespec *next, int success); +static int run_some_expired_alarms(gpr_timespec now, + grpc_iomgr_cb_status status); static gpr_timespec compute_min_deadline(shard_type *shard) { return grpc_alarm_heap_is_empty(&shard->heap) @@ -102,7 +102,7 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown() { int i; - while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0)) + while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED)) ; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -233,7 +233,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { gpr_mu_unlock(&shard->mu); if (triggered) { - alarm->cb(alarm->cb_arg, 0); + alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED); } } @@ -299,8 +299,8 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now, return n; } -static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, - gpr_timespec *next, int success) { +static int run_some_expired_alarms(gpr_timespec now, + grpc_iomgr_cb_status status) { size_t n = 0; size_t i; grpc_alarm *alarms[MAX_ALARMS_PER_CHECK]; @@ -329,35 +329,19 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, note_deadline_change(g_shard_queue[0]); } - if (next) { - *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); - } - gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_checker_mu); - } else if (next) { - gpr_mu_lock(&g_mu); - *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); - gpr_mu_unlock(&g_mu); - } - - if (n && drop_mu) { - gpr_mu_unlock(drop_mu); } for (i = 0; i < n; i++) { - alarms[i]->cb(alarms[i]->cb_arg, success); - } - - if (n && drop_mu) { - gpr_mu_lock(drop_mu); + alarms[i]->cb(alarms[i]->cb_arg, status); } return n; } -int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { - return run_some_expired_alarms(drop_mu, now, next, 1); +int grpc_alarm_check(gpr_timespec now) { + return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS); } gpr_timespec grpc_alarm_list_next_timeout() { diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h index 12b6ab4286..e605ff84f9 100644 --- a/src/core/iomgr/alarm_internal.h +++ b/src/core/iomgr/alarm_internal.h @@ -34,12 +34,9 @@ #ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ #define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ -#include <grpc/support/sync.h> -#include <grpc/support/time.h> - /* iomgr internal api for dealing with alarms */ -int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next); +int grpc_alarm_check(gpr_timespec now); void grpc_alarm_list_init(gpr_timespec now); void grpc_alarm_list_shutdown(); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 9e5d56389d..f1944bf672 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -34,16 +34,14 @@ #include "src/core/iomgr/endpoint.h" void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data) { - ep->vtable->notify_on_read(ep, cb, user_data); + void *user_data, gpr_timespec deadline) { + ep->vtable->notify_on_read(ep, cb, user_data, deadline); } -grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { - return ep->vtable->write(ep, slices, nslices, cb, user_data); +grpc_endpoint_write_status grpc_endpoint_write( + grpc_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { + return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline); } void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index ec86d9a146..bbd800bea8 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -48,7 +48,8 @@ typedef enum grpc_endpoint_cb_status { GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */ GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */ GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */ - GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */ + GRPC_ENDPOINT_CB_ERROR, /* Call interrupted by socket error */ + GRPC_ENDPOINT_CB_TIMED_OUT /* Call timed out */ } grpc_endpoint_cb_status; typedef enum grpc_endpoint_write_status { @@ -65,10 +66,10 @@ typedef void (*grpc_endpoint_write_cb)(void *user_data, struct grpc_endpoint_vtable { void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data); + void *user_data, gpr_timespec deadline); grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_endpoint_write_cb cb, - void *user_data); + void *user_data, gpr_timespec deadline); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); @@ -76,7 +77,7 @@ struct grpc_endpoint_vtable { /* When data is available on the connection, calls the callback with slices. */ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data); + void *user_data, gpr_timespec deadline); /* Write slices out to the socket. @@ -84,11 +85,9 @@ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, returns GRPC_ENDPOINT_WRITE_DONE. Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the connection is ready for more data. */ -grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data); +grpc_endpoint_write_status grpc_endpoint_write( + grpc_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline); /* Causes any pending read/write callbacks to run immediately with GRPC_ENDPOINT_CB_SHUTDOWN status */ diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c deleted file mode 100644 index 3cd2f9a8e0..0000000000 --- a/src/core/iomgr/fd_posix.c +++ /dev/null @@ -1,274 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/iomgr/fd_posix.h" - -#include <assert.h> -#include <unistd.h> - -#include "src/core/iomgr/iomgr_internal.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -enum descriptor_state { NOT_READY, READY, WAITING }; - -static void destroy(grpc_fd *fd) { - grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); - gpr_mu_destroy(&fd->set_state_mu); - gpr_free(fd->watchers); - gpr_free(fd); - grpc_iomgr_unref(); -} - -static void ref_by(grpc_fd *fd, int n) { - gpr_atm_no_barrier_fetch_add(&fd->refst, n); -} - -static void unref_by(grpc_fd *fd, int n) { - if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) { - destroy(fd); - } -} - -static void do_nothing(void *ignored, int success) {} - -grpc_fd *grpc_fd_create(int fd) { - grpc_fd *r = gpr_malloc(sizeof(grpc_fd)); - grpc_iomgr_ref(); - gpr_atm_rel_store(&r->refst, 1); - gpr_atm_rel_store(&r->readst.state, NOT_READY); - gpr_atm_rel_store(&r->writest.state, NOT_READY); - gpr_mu_init(&r->set_state_mu); - gpr_mu_init(&r->watcher_mu); - gpr_atm_rel_store(&r->shutdown, 0); - r->fd = fd; - r->watchers = NULL; - r->watcher_count = 0; - r->watcher_capacity = 0; - grpc_pollset_add_fd(grpc_backup_pollset(), r); - return r; -} - -int grpc_fd_is_orphaned(grpc_fd *fd) { - return (gpr_atm_acq_load(&fd->refst) & 1) == 0; -} - -static void wake_watchers(grpc_fd *fd) { - size_t i, n; - gpr_mu_lock(&fd->watcher_mu); - n = fd->watcher_count; - for (i = 0; i < n; i++) { - grpc_pollset_force_kick(fd->watchers[i]); - } - gpr_mu_unlock(&fd->watcher_mu); -} - -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { - fd->on_done = on_done ? on_done : do_nothing; - fd->on_done_user_data = user_data; - ref_by(fd, 1); /* remove active status, but keep referenced */ - wake_watchers(fd); - close(fd->fd); - unref_by(fd, 2); /* drop the reference */ -} - -/* increment refcount by two to avoid changing the orphan bit */ -void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } - -void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } - -typedef struct { - grpc_iomgr_cb_func cb; - void *arg; -} callback; - -static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, - int allow_synchronous_callback) { - if (allow_synchronous_callback) { - cb(arg, success); - } else { - grpc_iomgr_add_delayed_callback(cb, arg, success); - } -} - -static void make_callbacks(callback *callbacks, size_t n, int success, - int allow_synchronous_callback) { - size_t i; - for (i = 0; i < n; i++) { - make_callback(callbacks[i].cb, callbacks[i].arg, success, - allow_synchronous_callback); - } -} - -static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, - void *arg, int allow_synchronous_callback) { - switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { - case NOT_READY: - /* There is no race if the descriptor is already ready, so we skip - the interlocked op in that case. As long as the app doesn't - try to set the same upcall twice (which it shouldn't) then - oldval should never be anything other than READY or NOT_READY. We - don't - check for user error on the fast path. */ - st->cb = cb; - st->cb_arg = arg; - if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) { - /* swap was successful -- the closure will run after the next - set_ready call. NOTE: we don't have an ABA problem here, - since we should never have concurrent calls to the same - notify_on function. */ - wake_watchers(fd); - return; - } - /* swap was unsuccessful due to an intervening set_ready call. - Fall through to the READY code below */ - case READY: - assert(gpr_atm_acq_load(&st->state) == READY); - gpr_atm_rel_store(&st->state, NOT_READY); - make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback); - return; - case WAITING: - /* upcallptr was set to a different closure. This is an error! */ - gpr_log(GPR_ERROR, - "User called a notify_on function with a previous callback still " - "pending"); - abort(); - } - gpr_log(GPR_ERROR, "Corrupt memory in &st->state"); - abort(); -} - -static void set_ready_locked(grpc_fd_state *st, callback *callbacks, - size_t *ncallbacks) { - callback *c; - - switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { - case NOT_READY: - if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) { - /* swap was successful -- the closure will run after the next - notify_on call. */ - return; - } - /* swap was unsuccessful due to an intervening set_ready call. - Fall through to the WAITING code below */ - case WAITING: - assert(gpr_atm_acq_load(&st->state) == WAITING); - c = &callbacks[(*ncallbacks)++]; - c->cb = st->cb; - c->arg = st->cb_arg; - gpr_atm_rel_store(&st->state, NOT_READY); - return; - case READY: - /* duplicate ready, ignore */ - return; - } -} - -static void set_ready(grpc_fd *fd, grpc_fd_state *st, - int allow_synchronous_callback) { - /* only one set_ready can be active at once (but there may be a racing - notify_on) */ - int success; - callback cb; - size_t ncb = 0; - gpr_mu_lock(&fd->set_state_mu); - set_ready_locked(st, &cb, &ncb); - gpr_mu_unlock(&fd->set_state_mu); - success = !gpr_atm_acq_load(&fd->shutdown); - make_callbacks(&cb, ncb, success, allow_synchronous_callback); -} - -void grpc_fd_shutdown(grpc_fd *fd) { - callback cb[2]; - size_t ncb = 0; - gpr_mu_lock(&fd->set_state_mu); - GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown)); - gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(&fd->readst, cb, &ncb); - set_ready_locked(&fd->writest, cb, &ncb); - gpr_mu_unlock(&fd->set_state_mu); - make_callbacks(cb, ncb, 0, 0); -} - -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, - void *read_cb_arg) { - notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0); -} - -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, - void *write_cb_arg) { - notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0); -} - -gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - gpr_uint32 read_mask, gpr_uint32 write_mask) { - /* keep track of pollers that have requested our events, in case they change - */ - gpr_mu_lock(&fd->watcher_mu); - if (fd->watcher_capacity == fd->watcher_count) { - fd->watcher_capacity = - GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2); - fd->watchers = gpr_realloc(fd->watchers, - fd->watcher_capacity * sizeof(grpc_pollset *)); - } - fd->watchers[fd->watcher_count++] = pollset; - gpr_mu_unlock(&fd->watcher_mu); - - return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) | - (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0); -} - -void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) { - size_t r, w, n; - - gpr_mu_lock(&fd->watcher_mu); - n = fd->watcher_count; - for (r = 0, w = 0; r < n; r++) { - if (fd->watchers[r] == pollset) { - fd->watcher_count--; - continue; - } - fd->watchers[w++] = fd->watchers[r]; - } - gpr_mu_unlock(&fd->watcher_mu); -} - -void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { - set_ready(fd, &fd->readst, allow_synchronous_callback); -} - -void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) { - set_ready(fd, &fd->writest, allow_synchronous_callback); -} diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h deleted file mode 100644 index 232de0c3e0..0000000000 --- a/src/core/iomgr/fd_posix.h +++ /dev/null @@ -1,138 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ -#define __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ - -#include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/pollset.h" -#include <grpc/support/atm.h> -#include <grpc/support/sync.h> -#include <grpc/support/time.h> - -typedef struct { - grpc_iomgr_cb_func cb; - void *cb_arg; - int success; - gpr_atm state; -} grpc_fd_state; - -typedef struct grpc_fd { - int fd; - /* refst format: - bit0: 1=active/0=orphaned - bit1-n: refcount - meaning that mostly we ref by two to avoid altering the orphaned bit, - and just unref by 1 when we're ready to flag the object as orphaned */ - gpr_atm refst; - - gpr_mu set_state_mu; - gpr_atm shutdown; - - gpr_mu watcher_mu; - grpc_pollset **watchers; - size_t watcher_count; - size_t watcher_capacity; - - grpc_fd_state readst; - grpc_fd_state writest; - - grpc_iomgr_cb_func on_done; - void *on_done_user_data; -} grpc_fd; - -/* Create a wrapped file descriptor. - Requires fd is a non-blocking file descriptor. - This takes ownership of closing fd. */ -grpc_fd *grpc_fd_create(int fd); - -/* Releases fd to be asynchronously destroyed. - on_done is called when the underlying file descriptor is definitely close()d. - If on_done is NULL, no callback will be made. - Requires: *fd initialized; no outstanding notify_on_read or - notify_on_write. */ -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data); - -/* Begin polling on an fd. - Registers that the given pollset is interested in this fd - so that if read - or writability interest changes, the pollset can be kicked to pick up that - new interest. - Return value is: - (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0) - i.e. a combination of read_mask and write_mask determined by the fd's current - interest in said events. - Polling strategies that do not need to alter their behavior depending on the - fd's current interest (such as epoll) do not need to call this function. */ -gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - gpr_uint32 read_mask, gpr_uint32 write_mask); -/* Complete polling previously started with grpc_fd_begin_poll */ -void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset); - -/* Return 1 if this fd is orphaned, 0 otherwise */ -int grpc_fd_is_orphaned(grpc_fd *fd); - -/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ -void grpc_fd_shutdown(grpc_fd *fd); - -/* Register read interest, causing read_cb to be called once when fd becomes - readable, on deadline specified by deadline, or on shutdown triggered by - grpc_fd_shutdown. - read_cb will be called with read_cb_arg when *fd becomes readable. - read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, - GRPC_CALLBACK_TIMED_OUT if the call timed out, - and CANCELLED if the call was cancelled. - - Requires:This method must not be called before the read_cb for any previous - call runs. Edge triggered events are used whenever they are supported by the - underlying platform. This means that users must drain fd in read_cb before - calling notify_on_read again. Users are also expected to handle spurious - events, i.e read_cb is called while nothing can be readable from fd */ -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, - void *read_cb_arg); - -/* Exactly the same semantics as above, except based on writable events. */ -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, - void *write_cb_arg); - -/* Notification from the poller to an fd that it has become readable or - writable. - If allow_synchronous_callback is 1, allow running the fd callback inline - in this callstack, otherwise register an asynchronous callback and return */ -void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback); -void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback); - -/* Reference counting for fds */ -void grpc_fd_ref(grpc_fd *fd); -void grpc_fd_unref(grpc_fd *fd); - -#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c deleted file mode 100644 index 03f56a50a3..0000000000 --- a/src/core/iomgr/iomgr.c +++ /dev/null @@ -1,204 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/iomgr/iomgr.h" - -#include <stdlib.h> - -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/alarm_internal.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/thd.h> -#include <grpc/support/sync.h> - -typedef struct delayed_callback { - grpc_iomgr_cb_func cb; - void *cb_arg; - int success; - struct delayed_callback *next; -} delayed_callback; - -static gpr_mu g_mu; -static gpr_cv g_cv; -static delayed_callback *g_cbs_head = NULL; -static delayed_callback *g_cbs_tail = NULL; -static int g_shutdown; -static int g_refs; -static gpr_event g_background_callback_executor_done; - -/* Execute followup callbacks continuously. - Other threads may check in and help during pollset_work() */ -static void background_callback_executor(void *ignored) { - gpr_mu_lock(&g_mu); - while (!g_shutdown) { - gpr_timespec deadline = gpr_inf_future; - if (g_cbs_head) { - delayed_callback *cb = g_cbs_head; - g_cbs_head = cb->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - cb->cb(cb->cb_arg, cb->success); - gpr_free(cb); - gpr_mu_lock(&g_mu); - } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { - } else { - gpr_cv_wait(&g_cv, &g_mu, deadline); - } - } - gpr_mu_unlock(&g_mu); - gpr_event_set(&g_background_callback_executor_done, (void *)1); -} - -void grpc_kick_poller() { gpr_cv_broadcast(&g_cv); } - -void grpc_iomgr_init() { - gpr_thd_id id; - gpr_mu_init(&g_mu); - gpr_cv_init(&g_cv); - grpc_alarm_list_init(gpr_now()); - g_refs = 0; - grpc_iomgr_platform_init(); - gpr_event_init(&g_background_callback_executor_done); - gpr_thd_new(&id, background_callback_executor, NULL, NULL); -} - -void grpc_iomgr_shutdown() { - delayed_callback *cb; - gpr_timespec shutdown_deadline = - gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); - - grpc_iomgr_platform_shutdown(); - - gpr_mu_lock(&g_mu); - g_shutdown = 1; - while (g_cbs_head || g_refs) { - gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, - g_cbs_head ? " and executing final callbacks" : ""); - while (g_cbs_head) { - cb = g_cbs_head; - g_cbs_head = cb->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - - cb->cb(cb->cb_arg, 0); - gpr_free(cb); - gpr_mu_lock(&g_mu); - } - if (g_refs) { - if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) { - gpr_log(GPR_DEBUG, - "Failed to free %d iomgr objects before shutdown deadline: " - "memory leaks are likely", - g_refs); - break; - } - } - } - gpr_mu_unlock(&g_mu); - - gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); - - grpc_alarm_list_shutdown(); - gpr_mu_destroy(&g_mu); - gpr_cv_destroy(&g_cv); -} - -void grpc_iomgr_ref() { - gpr_mu_lock(&g_mu); - ++g_refs; - gpr_mu_unlock(&g_mu); -} - -void grpc_iomgr_unref() { - gpr_mu_lock(&g_mu); - if (0 == --g_refs) { - gpr_cv_signal(&g_cv); - } - gpr_mu_unlock(&g_mu); -} - -void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, - int success) { - delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback)); - dcb->cb = cb; - dcb->cb_arg = cb_arg; - dcb->success = success; - gpr_mu_lock(&g_mu); - dcb->next = NULL; - if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = dcb; - } else { - g_cbs_tail->next = dcb; - g_cbs_tail = dcb; - } - gpr_cv_signal(&g_cv); - gpr_mu_unlock(&g_mu); -} - -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { - grpc_iomgr_add_delayed_callback(cb, cb_arg, 1); -} - -int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { - int n = 0; - gpr_mu *retake_mu = NULL; - delayed_callback *cb; - for (;;) { - /* check for new work */ - if (!gpr_mu_trylock(&g_mu)) { - break; - } - cb = g_cbs_head; - if (!cb) { - gpr_mu_unlock(&g_mu); - break; - } - g_cbs_head = cb->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - /* if we have a mutex to drop, do so before executing work */ - if (drop_mu) { - gpr_mu_unlock(drop_mu); - retake_mu = drop_mu; - drop_mu = NULL; - } - cb->cb(cb->cb_arg, success && cb->success); - gpr_free(cb); - n++; - } - if (retake_mu) { - gpr_mu_lock(retake_mu); - } - return n; -} diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 16991a9b90..cf39f947bc 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -34,8 +34,17 @@ #ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__ #define __GRPC_INTERNAL_IOMGR_IOMGR_H__ +/* Status passed to callbacks for grpc_em_fd_notify_on_read and + grpc_em_fd_notify_on_write. */ +typedef enum grpc_em_cb_status { + GRPC_CALLBACK_SUCCESS = 0, + GRPC_CALLBACK_TIMED_OUT, + GRPC_CALLBACK_CANCELLED, + GRPC_CALLBACK_DO_NOT_USE +} grpc_iomgr_cb_status; + /* gRPC Callback definition */ -typedef void (*grpc_iomgr_cb_func)(void *arg, int success); +typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status); void grpc_iomgr_init(); void grpc_iomgr_shutdown(); diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_completion_queue_interface.h index ca5af3e527..3c4efe773a 100644 --- a/src/core/iomgr/iomgr_posix.h +++ b/src/core/iomgr/iomgr_completion_queue_interface.h @@ -31,12 +31,15 @@ * */ -#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ -#define __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ -#include "src/core/iomgr/iomgr_internal.h" +/* Internals of iomgr that are exposed only to be used for completion queue + implementation */ -void grpc_pollset_global_init(); -void grpc_pollset_global_shutdown(); +extern gpr_mu grpc_iomgr_mu; +extern gpr_cv grpc_iomgr_cv; -#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ */ +int grpc_iomgr_work(gpr_timespec deadline); + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */ diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c new file mode 100644 index 0000000000..6188ab2749 --- /dev/null +++ b/src/core/iomgr/iomgr_libevent.c @@ -0,0 +1,652 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/iomgr_libevent.h" + +#include <unistd.h> +#include <fcntl.h> + +#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/alarm_internal.h" +#include <grpc/support/atm.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <event2/event.h> +#include <event2/thread.h> + +#define ALARM_TRIGGER_INIT ((gpr_atm)0) +#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1) +#define DONE_SHUTDOWN ((void *)1) + +#define POLLER_ID_INVALID ((gpr_atm)-1) + +/* Global data */ +struct event_base *g_event_base; +gpr_mu grpc_iomgr_mu; +gpr_cv grpc_iomgr_cv; +static grpc_libevent_activation_data *g_activation_queue; +static int g_num_pollers; +static int g_num_fds; +static int g_num_address_resolutions; +static gpr_timespec g_last_poll_completed; +static int g_shutdown_backup_poller; +static gpr_event g_backup_poller_done; +/* activated to break out of the event loop early */ +static struct event *g_timeout_ev; +/* activated to safely break polling from other threads */ +static struct event *g_break_ev; +static grpc_fd *g_fds_to_free; + +int evthread_use_threads(void); +static void grpc_fd_impl_destroy(grpc_fd *impl); + +void grpc_iomgr_ref_address_resolution(int delta) { + gpr_mu_lock(&grpc_iomgr_mu); + GPR_ASSERT(!g_shutdown_backup_poller); + g_num_address_resolutions += delta; + if (0 == g_num_address_resolutions) { + gpr_cv_broadcast(&grpc_iomgr_cv); + } + gpr_mu_unlock(&grpc_iomgr_mu); +} + +/* If anything is in the work queue, process one item and return 1. + Return 0 if there were no work items to complete. + Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ +static int maybe_do_queue_work() { + grpc_libevent_activation_data *work = g_activation_queue; + + if (work == NULL) return 0; + + if (work->next == work) { + g_activation_queue = NULL; + } else { + g_activation_queue = work->next; + g_activation_queue->prev = work->prev; + g_activation_queue->next->prev = g_activation_queue->prev->next = + g_activation_queue; + } + work->next = work->prev = NULL; + /* force status to cancelled from ok when shutting down */ + if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) { + work->status = GRPC_CALLBACK_CANCELLED; + } + gpr_mu_unlock(&grpc_iomgr_mu); + + work->cb(work->arg, work->status); + + gpr_mu_lock(&grpc_iomgr_mu); + return 1; +} + +/* Break out of the event loop on timeout */ +static void timer_callback(int fd, short events, void *context) { + event_base_loopbreak((struct event_base *)context); +} + +static void break_callback(int fd, short events, void *context) { + event_base_loopbreak((struct event_base *)context); +} + +static void free_fd_list(grpc_fd *impl) { + while (impl != NULL) { + grpc_fd *current = impl; + impl = impl->next; + grpc_fd_impl_destroy(current); + current->on_done(current->on_done_user_data, GRPC_CALLBACK_SUCCESS); + gpr_free(current); + } +} + +static void maybe_free_fds() { + if (g_fds_to_free) { + free_fd_list(g_fds_to_free); + g_fds_to_free = NULL; + } +} + +void grpc_kick_poller() { event_active(g_break_ev, EV_READ, 0); } + +/* Spend some time doing polling and libevent maintenance work if no other + thread is. This includes both polling for events and destroying/closing file + descriptor objects. + Returns 1 if polling was performed, 0 otherwise. + Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ +static int maybe_do_polling_work(struct timeval delay) { + int status; + + if (g_num_pollers) return 0; + + g_num_pollers = 1; + + maybe_free_fds(); + + gpr_mu_unlock(&grpc_iomgr_mu); + + event_add(g_timeout_ev, &delay); + status = event_base_loop(g_event_base, EVLOOP_ONCE); + if (status < 0) { + gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status); + } + event_del(g_timeout_ev); + + gpr_mu_lock(&grpc_iomgr_mu); + maybe_free_fds(); + + g_num_pollers = 0; + gpr_cv_broadcast(&grpc_iomgr_cv); + return 1; +} + +static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) { + int r = 0; + if (gpr_time_cmp(next, now) < 0) { + gpr_mu_unlock(&grpc_iomgr_mu); + r = grpc_alarm_check(now); + gpr_mu_lock(&grpc_iomgr_mu); + } + return r; +} + +int grpc_iomgr_work(gpr_timespec deadline) { + gpr_timespec now = gpr_now(); + gpr_timespec next = grpc_alarm_list_next_timeout(); + gpr_timespec delay_timespec = gpr_time_sub(deadline, now); + /* poll for no longer than one second */ + gpr_timespec max_delay = gpr_time_from_seconds(1); + struct timeval delay; + + if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) { + return 0; + } + + if (gpr_time_cmp(delay_timespec, max_delay) > 0) { + delay_timespec = max_delay; + } + + /* Adjust delay to account for the next alarm, if applicable. */ + delay_timespec = gpr_time_min( + delay_timespec, gpr_time_sub(grpc_alarm_list_next_timeout(), now)); + + delay = gpr_timeval_from_timespec(delay_timespec); + + if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) || + maybe_do_polling_work(delay)) { + g_last_poll_completed = gpr_now(); + return 1; + } + + return 0; +} + +static void backup_poller_thread(void *p) { + int backup_poller_engaged = 0; + /* allow no pollers for 100 milliseconds, then engage backup polling */ + gpr_timespec allow_no_pollers = gpr_time_from_millis(100); + + gpr_mu_lock(&grpc_iomgr_mu); + while (!g_shutdown_backup_poller) { + if (g_num_pollers == 0) { + gpr_timespec now = gpr_now(); + gpr_timespec time_until_engage = gpr_time_sub( + allow_no_pollers, gpr_time_sub(now, g_last_poll_completed)); + if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) { + if (!backup_poller_engaged) { + gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller"); + backup_poller_engaged = 1; + } + if (!maybe_do_queue_work()) { + gpr_timespec next = grpc_alarm_list_next_timeout(); + if (!maybe_do_alarm_work(now, next)) { + gpr_timespec deadline = + gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1))); + maybe_do_polling_work( + gpr_timeval_from_timespec(gpr_time_sub(deadline, now))); + } + } + } else { + if (backup_poller_engaged) { + gpr_log(GPR_DEBUG, "Backup poller disengaged"); + backup_poller_engaged = 0; + } + gpr_mu_unlock(&grpc_iomgr_mu); + gpr_sleep_until(gpr_time_add(now, time_until_engage)); + gpr_mu_lock(&grpc_iomgr_mu); + } + } else { + if (backup_poller_engaged) { + gpr_log(GPR_DEBUG, "Backup poller disengaged"); + backup_poller_engaged = 0; + } + gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future); + } + } + gpr_mu_unlock(&grpc_iomgr_mu); + + gpr_event_set(&g_backup_poller_done, (void *)1); +} + +void grpc_iomgr_init() { + gpr_thd_id backup_poller_id; + + if (evthread_use_threads() != 0) { + gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); + abort(); + } + + grpc_alarm_list_init(gpr_now()); + + gpr_mu_init(&grpc_iomgr_mu); + gpr_cv_init(&grpc_iomgr_cv); + g_activation_queue = NULL; + g_num_pollers = 0; + g_num_fds = 0; + g_num_address_resolutions = 0; + g_last_poll_completed = gpr_now(); + g_shutdown_backup_poller = 0; + g_fds_to_free = NULL; + + gpr_event_init(&g_backup_poller_done); + + g_event_base = NULL; + g_timeout_ev = NULL; + g_break_ev = NULL; + + g_event_base = event_base_new(); + if (!g_event_base) { + gpr_log(GPR_ERROR, "Failed to create the event base"); + abort(); + } + + if (evthread_make_base_notifiable(g_event_base) != 0) { + gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!"); + abort(); + } + + g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base); + g_break_ev = event_new(g_event_base, -1, EV_READ | EV_PERSIST, break_callback, + g_event_base); + + event_add(g_break_ev, NULL); + + gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL); +} + +void grpc_iomgr_shutdown() { + gpr_timespec fd_shutdown_deadline = + gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); + + /* broadcast shutdown */ + gpr_mu_lock(&grpc_iomgr_mu); + while (g_num_fds > 0 || g_num_address_resolutions > 0) { + gpr_log(GPR_INFO, + "waiting for %d fds and %d name resolutions to be destroyed before " + "closing event manager", + g_num_fds, g_num_address_resolutions); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) { + gpr_log(GPR_ERROR, + "not all fds or name resolutions destroyed before shutdown " + "deadline: memory leaks " + "are likely"); + break; + } else if (g_num_fds == 0 && g_num_address_resolutions == 0) { + gpr_log(GPR_INFO, "all fds closed, all name resolutions finished"); + } + } + + g_shutdown_backup_poller = 1; + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); + + gpr_event_wait(&g_backup_poller_done, gpr_inf_future); + + grpc_alarm_list_shutdown(); + + /* drain pending work */ + gpr_mu_lock(&grpc_iomgr_mu); + while (maybe_do_queue_work()) + ; + gpr_mu_unlock(&grpc_iomgr_mu); + + free_fd_list(g_fds_to_free); + + /* complete shutdown */ + gpr_mu_destroy(&grpc_iomgr_mu); + gpr_cv_destroy(&grpc_iomgr_cv); + + if (g_timeout_ev != NULL) { + event_free(g_timeout_ev); + } + + if (g_break_ev != NULL) { + event_free(g_break_ev); + } + + if (g_event_base != NULL) { + event_base_free(g_event_base); + g_event_base = NULL; + } +} + +static void add_task(grpc_libevent_activation_data *adata) { + gpr_mu_lock(&grpc_iomgr_mu); + if (g_activation_queue) { + adata->next = g_activation_queue; + adata->prev = adata->next->prev; + adata->next->prev = adata->prev->next = adata; + } else { + g_activation_queue = adata; + adata->next = adata->prev = adata; + } + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); +} + +static void grpc_fd_impl_destroy(grpc_fd *impl) { + grpc_em_task_activity_type type; + grpc_libevent_activation_data *adata; + + for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { + adata = &(impl->task.activation[type]); + GPR_ASSERT(adata->next == NULL); + if (adata->ev != NULL) { + event_free(adata->ev); + adata->ev = NULL; + } + } + + if (impl->shutdown_ev != NULL) { + event_free(impl->shutdown_ev); + impl->shutdown_ev = NULL; + } + gpr_mu_destroy(&impl->mu); + close(impl->fd); +} + +/* Proxy callback to call a gRPC read/write callback */ +static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) { + grpc_fd *em_fd = arg; + grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS; + int run_read_cb = 0; + int run_write_cb = 0; + grpc_libevent_activation_data *rdata, *wdata; + + gpr_mu_lock(&em_fd->mu); + if (em_fd->shutdown_started) { + status = GRPC_CALLBACK_CANCELLED; + } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) { + status = GRPC_CALLBACK_TIMED_OUT; + /* TODO(klempner): This is broken if we are monitoring both read and write + events on the same fd -- generating a spurious event is okay, but + generating a spurious timeout is not. */ + what |= (EV_READ | EV_WRITE); + } + + if (what & EV_READ) { + switch (em_fd->read_state) { + case GRPC_FD_WAITING: + run_read_cb = 1; + em_fd->read_state = GRPC_FD_IDLE; + break; + case GRPC_FD_IDLE: + case GRPC_FD_CACHED: + em_fd->read_state = GRPC_FD_CACHED; + } + } + if (what & EV_WRITE) { + switch (em_fd->write_state) { + case GRPC_FD_WAITING: + run_write_cb = 1; + em_fd->write_state = GRPC_FD_IDLE; + break; + case GRPC_FD_IDLE: + case GRPC_FD_CACHED: + em_fd->write_state = GRPC_FD_CACHED; + } + } + + if (run_read_cb) { + rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); + rdata->status = status; + add_task(rdata); + } else if (run_write_cb) { + wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); + wdata->status = status; + add_task(wdata); + } + gpr_mu_unlock(&em_fd->mu); +} + +static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) { + /* TODO(klempner): This could just run directly in the calling thread, except + that libevent's handling of event_active() on an event which is already in + flight on a different thread is racy and easily triggers TSAN. + */ + grpc_fd *impl = arg; + gpr_mu_lock(&impl->mu); + impl->shutdown_started = 1; + if (impl->read_state == GRPC_FD_WAITING) { + event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); + } + if (impl->write_state == GRPC_FD_WAITING) { + event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); + } + gpr_mu_unlock(&impl->mu); +} + +grpc_fd *grpc_fd_create(int fd) { + int flags; + grpc_libevent_activation_data *rdata, *wdata; + grpc_fd *impl = gpr_malloc(sizeof(grpc_fd)); + + gpr_mu_lock(&grpc_iomgr_mu); + g_num_fds++; + gpr_mu_unlock(&grpc_iomgr_mu); + + impl->shutdown_ev = NULL; + gpr_mu_init(&impl->mu); + + flags = fcntl(fd, F_GETFL, 0); + GPR_ASSERT((flags & O_NONBLOCK) != 0); + + impl->task.type = GRPC_EM_TASK_FD; + impl->fd = fd; + + rdata = &(impl->task.activation[GRPC_EM_TA_READ]); + rdata->ev = NULL; + rdata->cb = NULL; + rdata->arg = NULL; + rdata->status = GRPC_CALLBACK_SUCCESS; + rdata->prev = NULL; + rdata->next = NULL; + + wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]); + wdata->ev = NULL; + wdata->cb = NULL; + wdata->arg = NULL; + wdata->status = GRPC_CALLBACK_SUCCESS; + wdata->prev = NULL; + wdata->next = NULL; + + impl->read_state = GRPC_FD_IDLE; + impl->write_state = GRPC_FD_IDLE; + + impl->shutdown_started = 0; + impl->next = NULL; + + /* TODO(chenw): detect platforms where only level trigger is supported, + and set the event to non-persist. */ + rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ, + em_fd_cb, impl); + GPR_ASSERT(rdata->ev); + + wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE, + em_fd_cb, impl); + GPR_ASSERT(wdata->ev); + + impl->shutdown_ev = + event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl); + GPR_ASSERT(impl->shutdown_ev); + + return impl; +} + +static void do_nothing(void *ignored, grpc_iomgr_cb_status also_ignored) {} + +void grpc_fd_destroy(grpc_fd *impl, grpc_iomgr_cb_func on_done, + void *user_data) { + if (on_done == NULL) on_done = do_nothing; + + gpr_mu_lock(&grpc_iomgr_mu); + + /* Put the impl on the list to be destroyed by the poller. */ + impl->on_done = on_done; + impl->on_done_user_data = user_data; + impl->next = g_fds_to_free; + g_fds_to_free = impl; + /* TODO(ctiller): kick the poller so it destroys this fd promptly + (currently we may wait up to a second) */ + + g_num_fds--; + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); +} + +int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; } + +/* TODO(chenw): should we enforce the contract that notify_on_read cannot be + called when the previously registered callback has not been called yet. */ +int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb, + void *read_cb_arg, gpr_timespec deadline) { + int force_event = 0; + grpc_libevent_activation_data *rdata; + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + struct timeval delay = gpr_timeval_from_timespec(delay_timespec); + struct timeval *delayp = + gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; + + rdata = &impl->task.activation[GRPC_EM_TA_READ]; + + gpr_mu_lock(&impl->mu); + rdata->cb = read_cb; + rdata->arg = read_cb_arg; + + force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED); + impl->read_state = GRPC_FD_WAITING; + + if (force_event) { + event_active(rdata->ev, EV_READ, 1); + } else if (event_add(rdata->ev, delayp) == -1) { + gpr_mu_unlock(&impl->mu); + return 0; + } + gpr_mu_unlock(&impl->mu); + return 1; +} + +int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb, + void *write_cb_arg, gpr_timespec deadline) { + int force_event = 0; + grpc_libevent_activation_data *wdata; + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + struct timeval delay = gpr_timeval_from_timespec(delay_timespec); + struct timeval *delayp = + gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; + + wdata = &impl->task.activation[GRPC_EM_TA_WRITE]; + + gpr_mu_lock(&impl->mu); + wdata->cb = write_cb; + wdata->arg = write_cb_arg; + + force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED); + impl->write_state = GRPC_FD_WAITING; + + if (force_event) { + event_active(wdata->ev, EV_WRITE, 1); + } else if (event_add(wdata->ev, delayp) == -1) { + gpr_mu_unlock(&impl->mu); + return 0; + } + gpr_mu_unlock(&impl->mu); + return 1; +} + +void grpc_fd_shutdown(grpc_fd *em_fd) { + event_active(em_fd->shutdown_ev, EV_READ, 1); +} + +/* Sometimes we want a followup callback: something to be added from the + current callback for the EM to invoke once this callback is complete. + This is implemented by inserting an entry into an EM queue. */ + +/* The following structure holds the field needed for adding the + followup callback. These are the argument for the followup callback, + the function to use for the followup callback, and the + activation data pointer used for the queues (to free in the CB) */ +struct followup_callback_arg { + grpc_iomgr_cb_func func; + void *cb_arg; + grpc_libevent_activation_data adata; +}; + +static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) { + struct followup_callback_arg *fcb_arg = cb_arg; + /* Invoke the function */ + fcb_arg->func(fcb_arg->cb_arg, status); + gpr_free(fcb_arg); +} + +void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { + grpc_libevent_activation_data *adptr; + struct followup_callback_arg *fcb_arg; + + fcb_arg = gpr_malloc(sizeof(*fcb_arg)); + /* Set up the activation data and followup callback argument structures */ + adptr = &fcb_arg->adata; + adptr->ev = NULL; + adptr->cb = followup_proxy_callback; + adptr->arg = fcb_arg; + adptr->status = GRPC_CALLBACK_SUCCESS; + adptr->prev = NULL; + adptr->next = NULL; + + fcb_arg->func = cb; + fcb_arg->cb_arg = cb_arg; + + /* Insert an activation data for the specified em */ + add_task(adptr); +} diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h new file mode 100644 index 0000000000..5c088006a0 --- /dev/null +++ b/src/core/iomgr/iomgr_libevent.h @@ -0,0 +1,206 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ +#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ + +#include "src/core/iomgr/iomgr.h" +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +typedef struct grpc_fd grpc_fd; + +/* gRPC event manager task "base class". This is pretend-inheritance in C89. + This should be the first member of any actual grpc_em task type. + + Memory warning: expanding this will increase memory usage in any derived + class, so be careful. + + For generality, this base can be on multiple task queues and can have + multiple event callbacks registered. Not all "derived classes" will use + this feature. */ + +typedef enum grpc_libevent_task_type { + GRPC_EM_TASK_ALARM, + GRPC_EM_TASK_FD, + GRPC_EM_TASK_DO_NOT_USE +} grpc_libevent_task_type; + +/* Different activity types to shape the callback and queueing arrays */ +typedef enum grpc_em_task_activity_type { + GRPC_EM_TA_READ, /* use this also for single-type events */ + GRPC_EM_TA_WRITE, + GRPC_EM_TA_COUNT +} grpc_em_task_activity_type; + +/* Include the following #define for convenience for tasks like alarms that + only have a single type */ +#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ + +typedef struct grpc_libevent_activation_data { + struct event *ev; /* event activated on this callback type */ + grpc_iomgr_cb_func cb; /* function pointer for callback */ + void *arg; /* argument passed to cb */ + + /* Hold the status associated with the callback when queued */ + grpc_iomgr_cb_status status; + /* Now set up to link activations into scheduler queues */ + struct grpc_libevent_activation_data *prev; + struct grpc_libevent_activation_data *next; +} grpc_libevent_activation_data; + +typedef struct grpc_libevent_task { + grpc_libevent_task_type type; + + /* Now have an array of activation data elements: one for each activity + type that could get activated */ + grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT]; +} grpc_libevent_task; + +/* Initialize *em_fd. + Requires fd is a non-blocking file descriptor. + + This takes ownership of closing fd. + + Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */ +grpc_fd *grpc_fd_create(int fd); + +/* Cause *em_fd no longer to be initialized and closes the underlying fd. + on_done is called when the underlying file descriptor is definitely close()d. + If on_done is NULL, no callback will be made. + Requires: *em_fd initialized; no outstanding notify_on_read or + notify_on_write. */ +void grpc_fd_destroy(grpc_fd *em_fd, grpc_iomgr_cb_func on_done, + void *user_data); + +/* Returns the file descriptor associated with *em_fd. */ +int grpc_fd_get(grpc_fd *em_fd); + +/* Register read interest, causing read_cb to be called once when em_fd becomes + readable, on deadline specified by deadline, or on shutdown triggered by + grpc_fd_shutdown. + read_cb will be called with read_cb_arg when *em_fd becomes readable. + read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, + GRPC_CALLBACK_TIMED_OUT if the call timed out, + and CANCELLED if the call was cancelled. + + Requires:This method must not be called before the read_cb for any previous + call runs. Edge triggered events are used whenever they are supported by the + underlying platform. This means that users must drain em_fd in read_cb before + calling notify_on_read again. Users are also expected to handle spurious + events, i.e read_cb is called while nothing can be readable from em_fd */ +int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg, gpr_timespec deadline); + +/* Exactly the same semantics as above, except based on writable events. */ +int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg, gpr_timespec deadline); + +/* Cause any current and all future read/write callbacks to error out with + GRPC_CALLBACK_CANCELLED. */ +void grpc_fd_shutdown(grpc_fd *em_fd); + +/* =================== Event caching =================== + In order to not miss or double-return edges in the context of edge triggering + and multithreading, we need a per-fd caching layer in the eventmanager itself + to cache relevant events. + + There are two types of events we care about: calls to notify_on_[read|write] + and readable/writable events for the socket from eventfd. There are separate + event caches for read and write. + + There are three states: + 0. "waiting" -- There's been a call to notify_on_[read|write] which has not + had a corresponding event. In other words, we're waiting for an event so we + can run the callback. + 1. "idle" -- We are neither waiting nor have a cached event. + 2. "cached" -- There has been a read/write event without a waiting callback, + so we want to run the event next time the application calls + notify_on_[read|write]. + + The high level state diagram: + + +--------------------------------------------------------------------+ + | WAITING | IDLE | CACHED | + | | | | + | 1. --*-> 2. --+-> 3. --+\ + | | | <--+/ + | | | | + x+-- 6. 5. <-+-- 4. <-*-- | + | | | | + +--------------------------------------------------------------------+ + + Transitions right occur on read|write events. Transitions left occur on + notify_on_[read|write] events. + State transitions: + 1. Read|Write event while waiting -> run the callback and transition to idle. + 2. Read|Write event while idle -> transition to cached. + 3. Read|Write event with one already cached -> still cached. + 4. notify_on_[read|write] with event cached: run callback and transition to + idle. + 5. notify_on_[read|write] when idle: Store callback and transition to + waiting. + 6. notify_on_[read|write] when waiting: invalid. */ + +typedef enum grpc_fd_state { + GRPC_FD_WAITING = 0, + GRPC_FD_IDLE = 1, + GRPC_FD_CACHED = 2 +} grpc_fd_state; + +/* gRPC file descriptor handle. + The handle is used to register read/write callbacks to a file descriptor */ +struct grpc_fd { + grpc_libevent_task task; /* Base class, callbacks, queues, etc */ + int fd; /* File descriptor */ + + /* Note that the shutdown event is only needed as a workaround for libevent + not properly handling event_active on an in flight event. */ + struct event *shutdown_ev; /* activated to trigger shutdown */ + + /* protect shutdown_started|read_state|write_state and ensure barriers + between notify_on_[read|write] and read|write callbacks */ + gpr_mu mu; + int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ + grpc_fd_state read_state; + grpc_fd_state write_state; + + /* descriptor delete list. These are destroyed during polling. */ + struct grpc_fd *next; + grpc_iomgr_cb_func on_done; + void *on_done_user_data; +}; + +void grpc_iomgr_ref_address_resolution(int delta); + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */ diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_libevent_use_threads.c index 5f72542777..af449342f0 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_libevent_use_threads.c @@ -31,21 +31,26 @@ * */ -#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ -#define __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ - -#include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/iomgr_internal.h" +/* Posix grpc event manager support code. */ +#include <grpc/support/log.h> #include <grpc/support/sync.h> +#include <event2/thread.h> -int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); -void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, - int success); - -void grpc_iomgr_ref(); -void grpc_iomgr_unref(); - -void grpc_iomgr_platform_init(); -void grpc_iomgr_platform_shutdown(); +static int error_code = 0; +static gpr_once threads_once = GPR_ONCE_INIT; +static void evthread_threads_initialize(void) { + error_code = evthread_use_pthreads(); + if (error_code) { + gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); + } +} -#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ */ +/* Notify LibEvent that Posix pthread is used. */ +int evthread_use_threads() { + gpr_once_init(&threads_once, &evthread_threads_initialize); + /* For Pthreads or Windows threads, Libevent provides simple APIs to set + mutexes and conditional variables to support cross thread operations. + For other platforms, LibEvent provide callback APIs to hook mutexes and + conditional variables. */ + return error_code; +} diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/pollset.c index ff9195ec1d..62a0019eb3 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/pollset.c @@ -31,8 +31,7 @@ * */ -#include "src/core/iomgr/iomgr_posix.h" +#include "src/core/iomgr/pollset.h" -void grpc_iomgr_platform_init() { grpc_pollset_global_init(); } - -void grpc_iomgr_platform_shutdown() { grpc_pollset_global_shutdown(); } +void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; } +void grpc_pollset_destroy(grpc_pollset *pollset) {} diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 7374a4ec13..ba1a9d5429 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -34,31 +34,18 @@ #ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_ #define __GRPC_INTERNAL_IOMGR_POLLSET_H_ -#include <grpc/support/port_platform.h> - /* A grpc_pollset is a set of file descriptors that a higher level item is interested in. For example: - a server will typically keep a pollset containing all connected channels, so that it can find new calls to service - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ - -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_posix.h" -#endif +/* Eventually different implementations of iomgr will provide their own + grpc_pollset structs. As this is just a dummy wrapper to get the API in, + we just define a simple type here. */ +typedef struct { char unused; } grpc_pollset; void grpc_pollset_init(grpc_pollset *pollset); void grpc_pollset_destroy(grpc_pollset *pollset); -/* Do some work on a pollset. - May involve invoking asynchronous callbacks, or actually polling file - descriptors. - Requires GRPC_POLLSET_MU(pollset) locked. - May unlock GRPC_POLLSET_MU(pollset) during its execution. */ -int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline); - -/* Break a pollset out of polling work - Requires GRPC_POLLSET_MU(pollset) locked. */ -void grpc_pollset_kick(grpc_pollset *pollset); - #endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */ diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c deleted file mode 100644 index e482da94f7..0000000000 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ /dev/null @@ -1,239 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL - -#include "src/core/iomgr/pollset_posix.h" - -#include <errno.h> -#include <poll.h> -#include <stdlib.h> -#include <string.h> - -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -typedef struct { - /* all polled fds */ - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; - /* fds being polled by the current poller: parallel arrays of pollfd and the - * grpc_fd* that the pollfd was constructed from */ - size_t pfd_count; - size_t pfd_capacity; - grpc_fd **selfds; - struct pollfd *pfds; - /* fds that have been removed from the pollset explicitly */ - size_t del_count; - size_t del_capacity; - grpc_fd **dels; -} pollset_hdr; - -static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { - size_t i; - pollset_hdr *h = pollset->data.ptr; - /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ - for (i = 0; i < h->fd_count; i++) { - if (h->fds[i] == fd) return; - } - if (h->fd_count == h->fd_capacity) { - h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); - h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity); - } - h->fds[h->fd_count++] = fd; - grpc_fd_ref(fd); -} - -static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { - /* will get removed next poll cycle */ - pollset_hdr *h = pollset->data.ptr; - if (h->del_count == h->del_capacity) { - h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2); - h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity); - } - h->dels[h->del_count++] = fd; - grpc_fd_ref(fd); -} - -static void end_polling(grpc_pollset *pollset) { - size_t i; - pollset_hdr *h; - h = pollset->data.ptr; - for (i = 1; i < h->pfd_count; i++) { - grpc_fd_end_poll(h->selfds[i], pollset); - } -} - -static int multipoll_with_poll_pollset_maybe_work( - grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { - int timeout; - int r; - size_t i, np, nf, nd; - pollset_hdr *h; - - if (pollset->counter) { - return 0; - } - h = pollset->data.ptr; - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { - return 1; - } - } - if (h->pfd_capacity < h->fd_count + 1) { - h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); - gpr_free(h->pfds); - gpr_free(h->selfds); - h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity); - h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity); - } - nf = 0; - np = 1; - h->pfds[0].fd = grpc_kick_read_fd(pollset); - h->pfds[0].events = POLLIN; - h->pfds[0].revents = POLLOUT; - for (i = 0; i < h->fd_count; i++) { - int remove = grpc_fd_is_orphaned(h->fds[i]); - for (nd = 0; nd < h->del_count; nd++) { - if (h->fds[i] == h->dels[nd]) remove = 1; - } - if (remove) { - grpc_fd_unref(h->fds[i]); - } else { - h->fds[nf++] = h->fds[i]; - h->pfds[np].events = - grpc_fd_begin_poll(h->fds[i], pollset, POLLIN, POLLOUT); - h->selfds[np] = h->fds[i]; - h->pfds[np].fd = h->fds[i]->fd; - h->pfds[np].revents = 0; - np++; - } - } - h->pfd_count = np; - h->fd_count = nf; - for (nd = 0; nd < h->del_count; nd++) { - grpc_fd_unref(h->dels[nd]); - } - h->del_count = 0; - if (h->pfd_count == 0) { - end_polling(pollset); - return 0; - } - pollset->counter = 1; - gpr_mu_unlock(&pollset->mu); - - r = poll(h->pfds, h->pfd_count, timeout); - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - } else if (r == 0) { - /* do nothing */ - } else { - if (h->pfds[0].revents & POLLIN) { - grpc_kick_drain(pollset); - } - for (i = 1; i < np; i++) { - if (h->pfds[i].revents & POLLIN) { - grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback); - } - if (h->pfds[i].revents & POLLOUT) { - grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback); - } - } - } - end_polling(pollset); - - gpr_mu_lock(&pollset->mu); - pollset->counter = 0; - gpr_cv_broadcast(&pollset->cv); - return 1; -} - -static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { - size_t i; - pollset_hdr *h = pollset->data.ptr; - GPR_ASSERT(pollset->counter == 0); - for (i = 0; i < h->fd_count; i++) { - grpc_fd_unref(h->fds[i]); - } - for (i = 0; i < h->del_count; i++) { - grpc_fd_unref(h->dels[i]); - } - gpr_free(h->pfds); - gpr_free(h->selfds); - gpr_free(h->fds); - gpr_free(h->dels); - gpr_free(h); -} - -static const grpc_pollset_vtable multipoll_with_poll_pollset = { - multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, - multipoll_with_poll_pollset_maybe_work, - multipoll_with_poll_pollset_destroy}; - -void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { - size_t i; - pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); - pollset->vtable = &multipoll_with_poll_pollset; - pollset->data.ptr = h; - h->fd_count = nfds; - h->fd_capacity = nfds; - h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); - h->pfd_count = 0; - h->pfd_capacity = 0; - h->pfds = NULL; - h->selfds = NULL; - h->del_count = 0; - h->del_capacity = 0; - h->dels = NULL; - for (i = 0; i < nfds; i++) { - h->fds[i] = fds[i]; - grpc_fd_ref(fds[i]); - } -} - -#endif diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c deleted file mode 100644 index ff00e06429..0000000000 --- a/src/core/iomgr/pollset_posix.c +++ /dev/null @@ -1,342 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/iomgr/pollset_posix.h" - -#include <errno.h> -#include <poll.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> - -#include "src/core/iomgr/alarm_internal.h" -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_utils_posix.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/thd.h> -#include <grpc/support/useful.h> - -/* kick pipes: we keep a sharded set of pipes to allow breaking from poll. - Ideally this would be 1:1 with pollsets, but we'd like to avoid associating - full kernel objects with each pollset to keep them lightweight, so instead - keep a sharded set and allow associating a pollset with one of the shards. - - TODO(ctiller): move this out from this file, and allow an eventfd - implementation on linux */ - -#define LOG2_KICK_SHARDS 6 -#define KICK_SHARDS (1 << LOG2_KICK_SHARDS) - -static int g_kick_pipes[KICK_SHARDS][2]; -static grpc_pollset g_backup_pollset; -static int g_shutdown_backup_poller; -static gpr_event g_backup_poller_done; - -static void backup_poller(void *p) { - gpr_timespec delta = gpr_time_from_millis(100); - gpr_timespec last_poll = gpr_now(); - - gpr_mu_lock(&g_backup_pollset.mu); - while (g_shutdown_backup_poller == 0) { - gpr_timespec next_poll = gpr_time_add(last_poll, delta); - grpc_pollset_work(&g_backup_pollset, next_poll); - gpr_mu_unlock(&g_backup_pollset.mu); - gpr_sleep_until(next_poll); - gpr_mu_lock(&g_backup_pollset.mu); - last_poll = next_poll; - } - gpr_mu_unlock(&g_backup_pollset.mu); - - gpr_event_set(&g_backup_poller_done, (void *)1); -} - -static size_t kick_shard(const grpc_pollset *info) { - size_t x = (size_t)info; - return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1); -} - -int grpc_kick_read_fd(grpc_pollset *p) { - return g_kick_pipes[kick_shard(p)][0]; -} - -static int grpc_kick_write_fd(grpc_pollset *p) { - return g_kick_pipes[kick_shard(p)][1]; -} - -void grpc_pollset_force_kick(grpc_pollset *p) { - char c = 0; - while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR) - ; -} - -void grpc_pollset_kick(grpc_pollset *p) { - if (!p->counter) return; - grpc_pollset_force_kick(p); -} - -void grpc_kick_drain(grpc_pollset *p) { - int fd = grpc_kick_read_fd(p); - char buf[128]; - int r; - - for (;;) { - r = read(fd, buf, sizeof(buf)); - if (r > 0) continue; - if (r == 0) return; - switch (errno) { - case EAGAIN: - return; - case EINTR: - continue; - default: - gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); - return; - } - } -} - -/* global state management */ - -grpc_pollset *grpc_backup_pollset() { return &g_backup_pollset; } - -void grpc_pollset_global_init() { - int i; - gpr_thd_id id; - - /* initialize the kick shards */ - for (i = 0; i < KICK_SHARDS; i++) { - GPR_ASSERT(0 == pipe(g_kick_pipes[i])); - GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1)); - } - - /* initialize the backup pollset */ - grpc_pollset_init(&g_backup_pollset); - - /* start the backup poller thread */ - g_shutdown_backup_poller = 0; - gpr_event_init(&g_backup_poller_done); - gpr_thd_new(&id, backup_poller, NULL, NULL); -} - -void grpc_pollset_global_shutdown() { - int i; - - /* terminate the backup poller thread */ - gpr_mu_lock(&g_backup_pollset.mu); - g_shutdown_backup_poller = 1; - gpr_mu_unlock(&g_backup_pollset.mu); - gpr_event_wait(&g_backup_poller_done, gpr_inf_future); - - /* destroy the backup pollset */ - grpc_pollset_destroy(&g_backup_pollset); - - /* destroy the kick shards */ - for (i = 0; i < KICK_SHARDS; i++) { - close(g_kick_pipes[i][0]); - close(g_kick_pipes[i][1]); - } -} - -/* main interface */ - -static void become_empty_pollset(grpc_pollset *pollset); -static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd); - -void grpc_pollset_init(grpc_pollset *pollset) { - gpr_mu_init(&pollset->mu); - gpr_cv_init(&pollset->cv); - become_empty_pollset(pollset); -} - -void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd); - gpr_cv_broadcast(&pollset->cv); - gpr_mu_unlock(&pollset->mu); -} - -void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd); - gpr_cv_broadcast(&pollset->cv); - gpr_mu_unlock(&pollset->mu); -} - -int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { - /* pollset->mu already held */ - gpr_timespec now; - now = gpr_now(); - if (gpr_time_cmp(now, deadline) > 0) { - return 0; - } - if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { - return 1; - } - if (grpc_alarm_check(&pollset->mu, now, &deadline)) { - return 1; - } - return pollset->vtable->maybe_work(pollset, deadline, now, 1); -} - -void grpc_pollset_destroy(grpc_pollset *pollset) { - pollset->vtable->destroy(pollset); - gpr_mu_destroy(&pollset->mu); - gpr_cv_destroy(&pollset->cv); -} - -/* - * empty_pollset - a vtable that provides polling for NO file descriptors - */ - -static void empty_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { - become_unary_pollset(pollset, fd); -} - -static void empty_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {} - -static int empty_pollset_maybe_work(grpc_pollset *pollset, - gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { - return 0; -} - -static void empty_pollset_destroy(grpc_pollset *pollset) {} - -static const grpc_pollset_vtable empty_pollset = { - empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work, - empty_pollset_destroy}; - -static void become_empty_pollset(grpc_pollset *pollset) { - pollset->vtable = &empty_pollset; -} - -/* - * unary_poll_pollset - a vtable that provides polling for one file descriptor - * via poll() - */ - -static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { - grpc_fd *fds[2]; - if (fd == pollset->data.ptr) return; - fds[0] = pollset->data.ptr; - fds[1] = fd; - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); - grpc_fd_unref(fds[0]); -} - -static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { - if (fd == pollset->data.ptr) { - grpc_fd_unref(pollset->data.ptr); - become_empty_pollset(pollset); - } -} - -static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, - gpr_timespec deadline, - gpr_timespec now, - int allow_synchronous_callback) { - struct pollfd pfd[2]; - grpc_fd *fd; - int timeout; - int r; - - if (pollset->counter) { - return 0; - } - fd = pollset->data.ptr; - if (grpc_fd_is_orphaned(fd)) { - grpc_fd_unref(fd); - become_empty_pollset(pollset); - return 0; - } - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { - return 1; - } - } - pfd[0].fd = grpc_kick_read_fd(pollset); - pfd[0].events = POLLIN; - pfd[0].revents = 0; - pfd[1].fd = fd->fd; - pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT); - pfd[1].revents = 0; - pollset->counter = 1; - gpr_mu_unlock(&pollset->mu); - - r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - } else if (r == 0) { - /* do nothing */ - } else { - if (pfd[0].revents & POLLIN) { - grpc_kick_drain(pollset); - } - if (pfd[1].revents & POLLIN) { - grpc_fd_become_readable(fd, allow_synchronous_callback); - } - if (pfd[1].revents & POLLOUT) { - grpc_fd_become_writable(fd, allow_synchronous_callback); - } - } - - gpr_mu_lock(&pollset->mu); - grpc_fd_end_poll(fd, pollset); - pollset->counter = 0; - gpr_cv_broadcast(&pollset->cv); - return 1; -} - -static void unary_poll_pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(pollset->counter == 0); - grpc_fd_unref(pollset->data.ptr); -} - -static const grpc_pollset_vtable unary_poll_pollset = { - unary_poll_pollset_add_fd, unary_poll_pollset_del_fd, - unary_poll_pollset_maybe_work, unary_poll_pollset_destroy}; - -static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) { - pollset->vtable = &unary_poll_pollset; - pollset->counter = 0; - pollset->data.ptr = fd; - grpc_fd_ref(fd); -} diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h deleted file mode 100644 index f051079f5b..0000000000 --- a/src/core/iomgr/pollset_posix.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ -#define __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ - -#include <grpc/support/sync.h> - -typedef struct grpc_pollset_vtable grpc_pollset_vtable; - -/* forward declare only in this file to avoid leaking impl details via - pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not - use the struct tag */ -struct grpc_fd; - -typedef struct grpc_pollset { - /* pollsets under posix can mutate representation as fds are added and - removed. - For example, we may choose a poll() based implementation on linux for - few fds, and an epoll() based implementation for many fds */ - const grpc_pollset_vtable *vtable; - gpr_mu mu; - gpr_cv cv; - int counter; - union { - int fd; - void *ptr; - } data; -} grpc_pollset; - -struct grpc_pollset_vtable { - void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); - void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); - int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, - gpr_timespec now, int allow_synchronous_callback); - void (*destroy)(grpc_pollset *pollset); -}; - -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) -#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv) - -/* Add an fd to a pollset */ -void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd); -/* Force remove an fd from a pollset (normally they are removed on the next - poll after an fd is orphaned) */ -void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd); - -/* Force any current pollers to break polling */ -void grpc_pollset_force_kick(grpc_pollset *pollset); -/* Returns the fd to listen on for kicks */ -int grpc_kick_read_fd(grpc_pollset *p); -/* Call after polling has been kicked to leave the kicked state */ -void grpc_kick_drain(grpc_pollset *p); - -/* All fds get added to a backup pollset to ensure that progress is made - regardless of applications listening to events. Relying on this is slow - however (the backup pollset only listens every 100ms or so) - so it's not - to be relied on. */ -grpc_pollset *grpc_backup_pollset(); - -/* turn a pollset into a multipoller: platform specific */ -void grpc_platform_become_multipoller(grpc_pollset *pollset, - struct grpc_fd **fds, size_t fd_count); - -#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index c9c2c5378a..a0a04297eb 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -41,7 +41,7 @@ #include <unistd.h> #include <string.h> -#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/iomgr_libevent.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include <grpc/support/alloc.h> @@ -201,7 +201,7 @@ static void do_request(void *rp) { gpr_free(r->default_port); gpr_free(r); cb(arg, resolved); - grpc_iomgr_unref(); + grpc_iomgr_ref_address_resolution(-1); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -213,7 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); gpr_thd_id id; - grpc_iomgr_ref(); + grpc_iomgr_ref_address_resolution(1); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index d675c2dcec..88b599b582 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -38,9 +38,7 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/alarm.h" -#include "src/core/iomgr/iomgr_posix.h" -#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/iomgr_libevent.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -51,11 +49,8 @@ typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; - gpr_mu mu; grpc_fd *fd; gpr_timespec deadline; - grpc_alarm alarm; - int refs; } async_connect; static int prepare_socket(int fd) { @@ -79,42 +74,21 @@ error: return 0; } -static void on_alarm(void *acp, int success) { - int done; - async_connect *ac = acp; - gpr_mu_lock(&ac->mu); - if (ac->fd != NULL && success) { - grpc_fd_shutdown(ac->fd); - } - done = (--ac->refs == 0); - gpr_mu_unlock(&ac->mu); - if (done) { - gpr_mu_destroy(&ac->mu); - gpr_free(ac); - } -} - -static void on_writable(void *acp, int success) { +static void on_writable(void *acp, grpc_iomgr_cb_status status) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; int err; - int fd = ac->fd->fd; - int done; - grpc_endpoint *ep = NULL; - void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; - void *cb_arg = ac->cb_arg; - - grpc_alarm_cancel(&ac->alarm); + int fd = grpc_fd_get(ac->fd); - if (success) { + if (status == GRPC_CALLBACK_SUCCESS) { do { so_error_size = sizeof(so_error); err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); - goto finish; + goto error; } else if (so_error != 0) { if (so_error == ENOBUFS) { /* We will get one of these errors if we have run out of @@ -132,7 +106,7 @@ static void on_writable(void *acp, int success) { opened too many network connections. The "easy" fix: don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); - grpc_fd_notify_on_write(ac->fd, on_writable, ac); + grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline); return; } else { switch (so_error) { @@ -143,31 +117,27 @@ static void on_writable(void *acp, int success) { gpr_log(GPR_ERROR, "socket error: %d", so_error); break; } - goto finish; + goto error; } } else { - ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); - goto finish; + goto great_success; } } else { - gpr_log(GPR_ERROR, "on_writable failed during connect"); - goto finish; + gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status); + goto error; } abort(); -finish: - gpr_mu_lock(&ac->mu); - if (!ep) { - grpc_fd_orphan(ac->fd, NULL, NULL); - } - done = (--ac->refs == 0); - gpr_mu_unlock(&ac->mu); - if (done) { - gpr_mu_destroy(&ac->mu); - gpr_free(ac); - } - cb(cb_arg, ep); +error: + ac->cb(ac->cb_arg, NULL); + grpc_fd_destroy(ac->fd, NULL, NULL); + gpr_free(ac); + return; + +great_success: + ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + gpr_free(ac); } void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), @@ -206,7 +176,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), } while (err < 0 && errno == EINTR); if (err >= 0) { - gpr_log(GPR_DEBUG, "instant connect"); cb(arg, grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); return; @@ -222,10 +191,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac = gpr_malloc(sizeof(async_connect)); ac->cb = cb; ac->cb_arg = arg; + ac->deadline = deadline; ac->fd = grpc_fd_create(fd); - gpr_mu_init(&ac->mu); - ac->refs = 2; - - grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); - grpc_fd_notify_on_write(ac->fd, on_writable, ac); + grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline); } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 657f34aaf9..bc3ce69e47 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -255,14 +255,18 @@ typedef struct { grpc_endpoint_read_cb read_cb; void *read_user_data; + gpr_timespec read_deadline; grpc_endpoint_write_cb write_cb; void *write_user_data; + gpr_timespec write_deadline; grpc_tcp_slice_state write_state; } grpc_tcp; -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success); +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, + grpc_iomgr_cb_status status); +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, + grpc_iomgr_cb_status status); static void grpc_tcp_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -272,7 +276,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) { static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { - grpc_fd_orphan(tcp->em_fd, NULL, NULL); + grpc_fd_destroy(tcp->em_fd, NULL, NULL); gpr_free(tcp); } } @@ -304,7 +308,8 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, #define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, + grpc_iomgr_cb_status status) { grpc_tcp *tcp = (grpc_tcp *)arg; int iov_size = 1; gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; @@ -319,12 +324,18 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, 0); - if (!success) { + if (status == GRPC_CALLBACK_CANCELLED) { call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); grpc_tcp_unref(tcp); return; } + if (status == GRPC_CALLBACK_TIMED_OUT) { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT); + grpc_tcp_unref(tcp); + return; + } + /* TODO(klempner): Limit the amount we read at once. */ for (;;) { allocated_bytes = slice_state_append_blocks_into_iovec( @@ -366,7 +377,8 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { } else { /* Spurious read event, consume it here */ slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, + tcp->read_deadline); } } else { /* TODO(klempner): Log interesting errors */ @@ -395,13 +407,14 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { } static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data) { + void *user_data, gpr_timespec deadline) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_user_data = user_data; + tcp->read_deadline = deadline; gpr_ref(&tcp->refcount); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline); } #define MAX_WRITE_IOVEC 16 @@ -447,24 +460,34 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { }; } -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, + grpc_iomgr_cb_status status) { grpc_tcp *tcp = (grpc_tcp *)arg; grpc_endpoint_write_status write_status; grpc_endpoint_cb_status cb_status; grpc_endpoint_write_cb cb; - if (!success) { + cb_status = GRPC_ENDPOINT_CB_OK; + + if (status == GRPC_CALLBACK_CANCELLED) { + cb_status = GRPC_ENDPOINT_CB_SHUTDOWN; + } else if (status == GRPC_CALLBACK_TIMED_OUT) { + cb_status = GRPC_ENDPOINT_CB_TIMED_OUT; + } + + if (cb_status != GRPC_ENDPOINT_CB_OK) { slice_state_destroy(&tcp->write_state); cb = tcp->write_cb; tcp->write_cb = NULL; - cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN); + cb(tcp->write_user_data, cb_status); grpc_tcp_unref(tcp); return; } write_status = grpc_tcp_flush(tcp); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, + tcp->write_deadline); } else { slice_state_destroy(&tcp->write_state); if (write_status == GRPC_ENDPOINT_WRITE_DONE) { @@ -479,11 +502,9 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { } } -static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { +static grpc_endpoint_write_status grpc_tcp_write( + grpc_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_endpoint_write_status status; @@ -509,15 +530,17 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, gpr_ref(&tcp->refcount); tcp->write_cb = cb; tcp->write_user_data = user_data; - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); + tcp->write_deadline = deadline; + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, + tcp->write_deadline); } return status; } static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { - grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_add_fd(pollset, tcp->em_fd); + /* tickle the pollset so we crash if things aren't wired correctly */ + pollset->unused++; } static const grpc_endpoint_vtable vtable = { @@ -527,12 +550,14 @@ static const grpc_endpoint_vtable vtable = { grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; - tcp->fd = em_fd->fd; + tcp->fd = grpc_fd_get(em_fd); tcp->read_cb = NULL; tcp->write_cb = NULL; tcp->read_user_data = NULL; tcp->write_user_data = NULL; tcp->slice_size = slice_size; + tcp->read_deadline = gpr_inf_future; + tcp->write_deadline = gpr_inf_future; slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index c3eef1b4b7..830394d534 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -45,7 +45,7 @@ */ #include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_libevent.h" #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 1968246b75..46fba13f90 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -49,8 +49,8 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); grpc_tcp_server *grpc_tcp_server_create(); /* Start listening to bound ports */ -void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset, - grpc_tcp_server_cb cb, void *cb_arg); +void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb, + void *cb_arg); /* Add a port to the server, returning true on success, or false otherwise. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 5ed517748a..2abaf15ce4 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -45,7 +45,7 @@ #include <string.h> #include <errno.h> -#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/iomgr_libevent.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -97,8 +97,13 @@ grpc_tcp_server *grpc_tcp_server_create() { return s; } +static void done_destroy(void *p, grpc_iomgr_cb_status status) { + gpr_event_set(p, (void *)1); +} + void grpc_tcp_server_destroy(grpc_tcp_server *s) { size_t i; + gpr_event fd_done; gpr_mu_lock(&s->mu); /* shutdown all fd's */ for (i = 0; i < s->nports; i++) { @@ -113,7 +118,9 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) { /* delete ALL the things */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; - grpc_fd_orphan(sp->emfd, NULL, NULL); + gpr_event_init(&fd_done); + grpc_fd_destroy(sp->emfd, done_destroy, &fd_done); + gpr_event_wait(&fd_done, gpr_inf_future); } gpr_free(s->ports); gpr_free(s); @@ -189,10 +196,10 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, int success) { +static void on_read(void *arg, grpc_iomgr_cb_status status) { server_port *sp = arg; - if (!success) { + if (status != GRPC_CALLBACK_SUCCESS) { goto error; } @@ -208,7 +215,7 @@ static void on_read(void *arg, int success) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, on_read, sp); + grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -247,10 +254,15 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity); } sp = &s->ports[s->nports++]; - sp->server = s; - sp->fd = fd; sp->emfd = grpc_fd_create(fd); - GPR_ASSERT(sp->emfd); + sp->fd = fd; + sp->server = s; + /* initialize the em desc */ + if (sp->emfd == NULL) { + s->nports--; + gpr_mu_unlock(&s->mu); + return 0; + } gpr_mu_unlock(&s->mu); return 1; @@ -307,8 +319,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) { return (0 <= index && index < s->nports) ? s->ports[index].fd : -1; } -void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, - grpc_tcp_server_cb cb, void *cb_arg) { +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, + void *cb_arg) { size_t i; GPR_ASSERT(cb); gpr_mu_lock(&s->mu); @@ -317,10 +329,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { - if (pollset) { - grpc_pollset_add_fd(pollset, s->ports[i].emfd); - } - grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]); + grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i], + gpr_inf_future); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index c99ac8021d..442d2fa624 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -555,11 +555,12 @@ static int fake_oauth2_has_request_metadata_only( return 1; } -void on_simulated_token_fetch_done(void *user_data, int success) { +void on_simulated_token_fetch_done(void *user_data, + grpc_iomgr_cb_status status) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)r->creds; - GPR_ASSERT(success); + GPR_ASSERT(status == GRPC_CALLBACK_SUCCESS); r->cb(r->user_data, &c->access_token_md, 1, GRPC_CREDENTIALS_OK); grpc_credentials_metadata_request_destroy(r); } diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 7f0fdf73c9..cab09ca49d 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -184,7 +184,8 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, } static void endpoint_notify_on_read(grpc_endpoint *secure_ep, - grpc_endpoint_read_cb cb, void *user_data) { + grpc_endpoint_read_cb cb, void *user_data, + gpr_timespec deadline) { secure_endpoint *ep = (secure_endpoint *)secure_ep; ep->read_cb = cb; ep->read_user_data = user_data; @@ -199,7 +200,7 @@ static void endpoint_notify_on_read(grpc_endpoint *secure_ep, return; } - grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep); + grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep, deadline); } static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, @@ -216,11 +217,9 @@ static void on_write(void *data, grpc_endpoint_cb_status error) { secure_endpoint_unref(ep); } -static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { +static grpc_endpoint_write_status endpoint_write( + grpc_endpoint *secure_ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { int i = 0; int output_buffer_count = 0; tsi_result result = TSI_OK; @@ -309,7 +308,7 @@ static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep, /* Need to keep the endpoint alive across a transport */ secure_endpoint_ref(ep); status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices, - output_buffer_count, on_write, ep); + output_buffer_count, on_write, ep, deadline); if (status != GRPC_ENDPOINT_WRITE_PENDING) { secure_endpoint_unref(ep); } diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index 3df91ed8e7..eb11251912 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -105,7 +105,6 @@ static void check_peer(grpc_secure_transport_setup *s) { grpc_security_status peer_status; tsi_peer peer; tsi_result result = tsi_handshaker_extract_peer(s->handshaker, &peer); - if (result != TSI_OK) { gpr_log(GPR_ERROR, "Peer extraction failed with error %s", tsi_result_to_string(result)); @@ -153,8 +152,9 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - write_status = grpc_endpoint_write(s->endpoint, &to_send, 1, - on_handshake_data_sent_to_peer, s); + write_status = + grpc_endpoint_write(s->endpoint, &to_send, 1, + on_handshake_data_sent_to_peer, s, gpr_inf_future); if (write_status == GRPC_ENDPOINT_WRITE_ERROR) { gpr_log(GPR_ERROR, "Could not send handshake data to peer."); secure_transport_setup_done(s, 0); @@ -200,7 +200,8 @@ static void on_handshake_data_received_from_peer( /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ grpc_endpoint_notify_on_read(s->endpoint, - on_handshake_data_received_from_peer, setup); + on_handshake_data_received_from_peer, setup, + gpr_inf_future); cleanup_slices(slices, nslices); return; } else { @@ -257,7 +258,8 @@ static void on_handshake_data_sent_to_peer(void *setup, /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ grpc_endpoint_notify_on_read(s->endpoint, - on_handshake_data_received_from_peer, setup); + on_handshake_data_received_from_peer, setup, + gpr_inf_future); } else { check_peer(s); } diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 9d7c0e5e5a..28b56dd4c9 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -77,9 +77,9 @@ static void on_accept(void *server, grpc_endpoint *tcp) { /* Note: the following code is the same with server_chttp2.c */ /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) { +static void start(grpc_server *server, void *tcpp) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollset, on_accept, server); + grpc_tcp_server_start(tcp, on_accept, server); } /* Server callback: destroy the tcp listener (so we don't generate further diff --git a/src/core/support/time.c b/src/core/support/time.c index 5330092f56..712bdf441c 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -249,18 +249,3 @@ gpr_timespec gpr_timespec_from_timeval(struct timeval t) { ts.tv_nsec = t.tv_usec * 1000; return ts; } - -gpr_int32 gpr_time_to_millis(gpr_timespec t) { - if (t.tv_sec >= 2147483) { - if (t.tv_sec == 2147483 && t.tv_nsec < 648 * GPR_NS_PER_MS) { - return 2147483 * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS; - } - return 2147483647; - } else if (t.tv_sec <= -2147483) { - /* TODO(ctiller): correct handling here (it's so far in the past do we - care?) */ - return -2147483648; - } else { - return t.tv_sec * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS; - } -} diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 9c5f5064eb..9ed617f665 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -878,9 +878,9 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) { return &call->incoming_metadata; } -static void call_alarm(void *arg, int success) { +static void call_alarm(void *arg, grpc_iomgr_cb_status status) { grpc_call *call = arg; - if (success) { + if (status == GRPC_CALLBACK_SUCCESS) { grpc_call_cancel(call); } grpc_call_internal_unref(call); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b59c36e03a..4837f5b978 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,7 +36,7 @@ #include <stdio.h> #include <string.h> -#include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/iomgr_completion_queue_interface.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" @@ -61,7 +61,6 @@ typedef struct event { /* Completion queue structure */ struct grpc_completion_queue { - /* TODO(ctiller): see if this can be removed */ int allow_polling; /* When refs drops to zero, we are in shutdown mode, and will be destroyable @@ -101,7 +100,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { /* Create and append an event to the queue. Returns the event so that its data members can be filled in. - Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ + Requires grpc_iomgr_mu locked. */ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data) { @@ -127,8 +126,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, ev->bucket_prev = cc->buckets[bucket]->bucket_prev; ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; } - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); - grpc_pollset_kick(&cc->pollset); + gpr_cv_broadcast(&grpc_iomgr_cv); return ev; } @@ -151,7 +149,7 @@ static void end_op_locked(grpc_completion_queue *cc, if (gpr_unref(&cc->refs)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + gpr_cv_broadcast(&grpc_iomgr_cv); } } @@ -159,11 +157,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_byte_buffer *read) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data); ev->base.data.read = read; end_op_locked(cc, GRPC_READ); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, @@ -171,11 +169,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.invoke_accepted = error; end_op_locked(cc, GRPC_INVOKE_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, @@ -183,11 +181,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.write_accepted = error; end_op_locked(cc, GRPC_WRITE_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, @@ -195,11 +193,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.finish_accepted = error; end_op_locked(cc, GRPC_FINISH_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, @@ -208,13 +206,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, void *user_data, size_t count, grpc_metadata *elements) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish, user_data); ev->base.data.client_metadata_read.count = count; ev->base.data.client_metadata_read.elements = elements; end_op_locked(cc, GRPC_CLIENT_METADATA_READ); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -223,14 +221,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_metadata *metadata_elements, size_t metadata_count) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data); ev->base.data.finished.status = status; ev->base.data.finished.details = details; ev->base.data.finished.metadata_count = metadata_count; ev->base.data.finished.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_FINISHED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -239,7 +237,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data); ev->base.data.server_rpc_new.method = method; ev->base.data.server_rpc_new.host = host; @@ -247,7 +245,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, ev->base.data.server_rpc_new.metadata_count = metadata_count; ev->base.data.server_rpc_new.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_SERVER_RPC_NEW); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ @@ -264,7 +262,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); for (;;) { if (cc->queue != NULL) { gpr_uintptr bucket; @@ -290,16 +288,15 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { + if (cc->allow_polling && grpc_iomgr_work(deadline)) { continue; } - if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), - GRPC_POLLSET_MU(&cc->pollset), deadline)) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { + gpr_mu_unlock(&grpc_iomgr_mu); return NULL; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -337,7 +334,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); for (;;) { if ((ev = pluck_event(cc, tag))) { break; @@ -346,16 +343,15 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { + if (cc->allow_polling && grpc_iomgr_work(deadline)) { continue; } - if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), - GRPC_POLLSET_MU(&cc->pollset), deadline)) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { + gpr_mu_unlock(&grpc_iomgr_mu); return NULL; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -364,11 +360,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->refs)) { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index aa544a97f2..3829e7aa8f 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -52,7 +52,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; - void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset); + void (*start)(grpc_server *server, void *arg); void (*destroy)(grpc_server *server, void *arg); struct listener *next; } listener; @@ -192,7 +192,7 @@ static void orphan_channel(channel_data *chand) { chand->next = chand->prev = chand; } -static void finish_destroy_channel(void *cd, int success) { +static void finish_destroy_channel(void *cd, grpc_iomgr_cb_status status) { channel_data *chand = cd; grpc_server *server = chand->server; /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ @@ -247,7 +247,7 @@ static void start_new_rpc(grpc_call_element *elem) { gpr_mu_unlock(&server->mu); } -static void kill_zombie(void *elem, int success) { +static void kill_zombie(void *elem, grpc_iomgr_cb_status status) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -336,7 +336,7 @@ static void channel_op(grpc_channel_element *elem, } } -static void finish_shutdown_channel(void *cd, int success) { +static void finish_shutdown_channel(void *cd, grpc_iomgr_cb_status status) { channel_data *chand = cd; grpc_channel_op op; op.type = GRPC_CHANNEL_DISCONNECT; @@ -468,7 +468,7 @@ void grpc_server_start(grpc_server *server) { listener *l; for (l = server->listeners; l; l = l->next) { - l->start(server, l->arg, grpc_cq_pollset(server->cq)); + l->start(server, l->arg); } } @@ -596,8 +596,7 @@ void grpc_server_destroy(grpc_server *server) { } void grpc_server_add_listener(grpc_server *server, void *arg, - void (*start)(grpc_server *server, void *arg, - grpc_pollset *pollset), + void (*start)(grpc_server *server, void *arg), void (*destroy)(grpc_server *server, void *arg)) { listener *l = gpr_malloc(sizeof(listener)); l->arg = arg; diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 61292ebe4e..f0773ab9d5 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -47,8 +47,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, /* Add a listener to the server: when the server starts, it will call start, and when it shuts down, it will call destroy */ void grpc_server_add_listener(grpc_server *server, void *listener, - void (*start)(grpc_server *server, void *arg, - grpc_pollset *pollset), + void (*start)(grpc_server *server, void *arg), void (*destroy)(grpc_server *server, void *arg)); /* Setup a transport - creates a channel stack, binds the transport to the diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index a0961bd449..a5fdd03774 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) { } /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) { +static void start(grpc_server *server, void *tcpp) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollset, new_transport, server); + grpc_tcp_server_start(tcp, new_transport, server); } /* Server callback: destroy the tcp listener (so we don't generate further diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 5bf763e76f..a8ae8cc5bc 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -711,7 +711,7 @@ static void unlock(transport *t) { /* write some bytes if necessary */ while (start_write) { switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count, - finish_write, t)) { + finish_write, t, gpr_inf_future)) { case GRPC_ENDPOINT_WRITE_DONE: /* grab the lock directly without wrappers since we just want to continue writes if we loop: no need to check read callbacks again */ @@ -1617,6 +1617,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, case GRPC_ENDPOINT_CB_SHUTDOWN: case GRPC_ENDPOINT_CB_EOF: case GRPC_ENDPOINT_CB_ERROR: + case GRPC_ENDPOINT_CB_TIMED_OUT: lock(t); drop_connection(t); t->reading = 0; @@ -1641,7 +1642,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); if (keep_reading) { - grpc_endpoint_notify_on_read(t->ep, recv_data, t); + grpc_endpoint_notify_on_read(t->ep, recv_data, t, gpr_inf_future); } } diff --git a/src/ruby/Gemfile b/src/ruby/Gemfile index 597a7d4f4b..dc05f7946b 100755 --- a/src/ruby/Gemfile +++ b/src/ruby/Gemfile @@ -1,4 +1,12 @@ source 'https://rubygems.org' +# Modify this when working locally, see README.md +# e.g, +# gem 'beefcake', path: "/usr/local/google/repos/beefcake" +# +# The default value is what's used for gRPC ruby's GCE configuration +# +gem 'beefcake', path: "/var/local/git/beefcake" + # Specify your gem's dependencies in grpc.gemspec gemspec diff --git a/src/ruby/README.md b/src/ruby/README.md index 23aec2b20a..3a5c50819b 100755 --- a/src/ruby/README.md +++ b/src/ruby/README.md @@ -20,25 +20,67 @@ DEPENDENCIES The extension can be built and tested using [rake](https://rubygems.org/gems/rake). However, the rake-extensiontask rule is not supported on older versions of rubygems, and the necessary version of -rubygems. +rubygems is not available on the latest version of Goobuntu. This is resolved by using [RVM](https://rvm.io/) instead; install a single-user -ruby environment, and develop on the latest stable version of ruby (2.1.5). +ruby environment, and develop on the latest stable version of ruby (2.1.2). + + +* Proto code generation + +To build generate service stubs and skeletons, it's currently necessary to use +a patched version of a beefcake, a simple third-party proto2 library. This is +feature compatible with proto3 and will be replaced by official proto3 support +in protoc. + +* Patched protoc + +The patched version of beefcake in turn depends on a patched version of protoc. +This is an update of the latest open source release of protoc with some forward +looking proto3 patches. INSTALLATION PREREQUISITES -------------------------- +Install the patched protoc + +$ cd <git_repo_dir> +$ git clone sso://team/one-platform-grpc-team/protobuf +$ cd protobuf +$ ./configure --prefix=/usr +$ make +$ sudo make install + +Install an update to OpenSSL with ALPN support + +$ wget https://www.openssl.org/source/openssl-1.0.2-beta3.tar.gz +$ tar -zxvf openssl-1.0.2-beta3.tar.gz +$ cd openssl-1.0.2-beta3 +$ ./config shared +$ make +$ sudo make install + Install RVM +$ # the -with-openssl-dir ensures that ruby uses the updated version of SSL $ command curl -sSL https://rvm.io/mpapis.asc | gpg --import - $ \curl -sSL https://get.rvm.io | bash -s stable --ruby $ $ # follow the instructions to ensure that your're using the latest stable version of Ruby $ # and that the rvm command is installed $ +$ rvm reinstall 2.1.5 --with-openssl-dir=/usr/local/ssl $ gem install bundler # install bundler, the standard ruby package manager +Install the patched beefcake, and update the Gemfile to reference + +$ cd <git_repo_dir> +$ git clone sso://team/one-platform-grpc-team/grpc-ruby-beefcake beefcake +$ cd beefcake +$ bundle install +$ + HACKING ------- diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index d0478bb4d1..6e7bfeef97 100644 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -47,11 +47,9 @@ require 'minitest' require 'minitest/assertions' require 'grpc' -require 'google/protobuf' -require 'test/cpp/interop/test_services' -require 'test/cpp/interop/messages' -require 'test/cpp/interop/empty' +require 'third_party/stubby/testing/proto/test.pb' +require 'third_party/stubby/testing/proto/messages.pb' # loads the certificates used to access the test server securely. def load_test_certs @@ -81,7 +79,7 @@ end # produces a string of null chars (\0) of length l. def nulls(l) raise 'requires #{l} to be +ve' if l < 0 - [].pack('x' * l).force_encoding('utf-8') + [].pack('x' * l) end # a PingPongPlayer implements the ping pong bidi test. @@ -107,11 +105,10 @@ class PingPongPlayer req_size, resp_size = m req = req_cls.new(:payload => Payload.new(:body => nulls(req_size)), :response_type => COMPRESSABLE, - :response_parameters => [p_cls.new(:size => resp_size)]) + :response_parameters => p_cls.new(:size => resp_size)) yield req resp = @queue.pop - assert_equal(PayloadType.lookup(COMPRESSABLE), resp.payload.type, - 'payload type is wrong') + assert_equal(COMPRESSABLE, resp.payload.type, 'payload type is wrong') assert_equal(resp_size, resp.payload.body.length, 'payload body #{i} has the wrong length') p "OK: ping_pong #{count}" @@ -135,10 +132,10 @@ class NamedTests # TESTING # PASSED # FAIL - # ruby server: fails protobuf-ruby can't pass an empty message + # ruby server: fails beefcake throws on deserializing the 0-length message def empty_unary - resp = @stub.empty_call(Empty.new) - assert resp.is_a?(Empty), 'empty_unary: invalid response' + resp = @stub.empty_call(Proto2::Empty.new) + assert resp.is_a?(Proto::Empty), 'empty_unary: invalid response' p 'OK: empty_unary' end @@ -189,8 +186,7 @@ class NamedTests resps = @stub.streaming_output_call(req) resps.each_with_index do |r, i| assert i < msg_sizes.length, 'too many responses' - assert_equal(PayloadType.lookup(COMPRESSABLE), r.payload.type, - 'payload type is wrong') + assert_equal(COMPRESSABLE, r.payload.type, 'payload type is wrong') assert_equal(msg_sizes[i], r.payload.body.length, 'payload body #{i} has the wrong length') end @@ -201,6 +197,9 @@ class NamedTests # PASSED # ruby server # FAILED + # + # TODO(temiola): update this test to stay consistent with the java test's + # interpretation of the test spec. def ping_pong msg_sizes = [[27182, 31415], [8, 9], [1828, 2653], [45904, 58979]] ppp = PingPongPlayer.new(msg_sizes) diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb index 53e271e80d..35d69f6fd3 100644 --- a/src/ruby/bin/interop/interop_server.rb +++ b/src/ruby/bin/interop/interop_server.rb @@ -47,9 +47,8 @@ require 'optparse' require 'grpc' -require 'test/cpp/interop/test_services' -require 'test/cpp/interop/messages' -require 'test/cpp/interop/empty' +require 'third_party/stubby/testing/proto/test.pb' +require 'third_party/stubby/testing/proto/messages.pb' # loads the certificates by the test server. def load_test_certs @@ -68,7 +67,7 @@ end # produces a string of null chars (\0) of length l. def nulls(l) raise 'requires #{l} to be +ve' if l < 0 - [].pack('x' * l).force_encoding('utf-8') + [].pack('x' * l) end # A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. @@ -99,7 +98,7 @@ class TestTarget < Grpc::Testing::TestService::Service include Grpc::Testing::PayloadType def empty_call(empty, call) - Empty.new + Proto::Empty.new end def unary_call(simple_req, call) diff --git a/src/ruby/bin/interop/net/proto2/bridge/proto/message_set.pb.rb b/src/ruby/bin/interop/net/proto2/bridge/proto/message_set.pb.rb new file mode 100755 index 0000000000..eadd1d4ceb --- /dev/null +++ b/src/ruby/bin/interop/net/proto2/bridge/proto/message_set.pb.rb @@ -0,0 +1,14 @@ +## Generated from net/proto2/bridge/proto/message_set.proto for proto2.bridge +require 'beefcake' + +module Proto2 + module Bridge + + class MessageSet + include Beefcake::Message + end + + class MessageSet + end + end +end diff --git a/src/ruby/bin/interop/net/proto2/proto/empty.pb.rb b/src/ruby/bin/interop/net/proto2/proto/empty.pb.rb new file mode 100755 index 0000000000..2aa0c76509 --- /dev/null +++ b/src/ruby/bin/interop/net/proto2/proto/empty.pb.rb @@ -0,0 +1,12 @@ +## Generated from net/proto2/proto/empty.proto for proto2 +require 'beefcake' + +module Proto2 + + class Empty + include Beefcake::Message + end + + class Empty + end +end diff --git a/src/ruby/bin/interop/test/cpp/interop/empty.rb b/src/ruby/bin/interop/test/cpp/interop/empty.rb deleted file mode 100644 index acd4160d24..0000000000 --- a/src/ruby/bin/interop/test/cpp/interop/empty.rb +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2014, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/cpp/interop/empty.proto - -require 'google/protobuf' - -Google::Protobuf::DescriptorPool.generated_pool.build do - add_message "grpc.testing.Empty" do - end -end - -module Grpc - module Testing - Empty = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Empty").msgclass - end -end diff --git a/src/ruby/bin/interop/test/cpp/interop/messages.rb b/src/ruby/bin/interop/test/cpp/interop/messages.rb deleted file mode 100644 index 491608bff2..0000000000 --- a/src/ruby/bin/interop/test/cpp/interop/messages.rb +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2014, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/cpp/interop/messages.proto - -require 'google/protobuf' - -Google::Protobuf::DescriptorPool.generated_pool.build do - add_message "grpc.testing.Payload" do - optional :type, :enum, 1, "grpc.testing.PayloadType" - optional :body, :string, 2 - end - add_message "grpc.testing.SimpleRequest" do - optional :response_type, :enum, 1, "grpc.testing.PayloadType" - optional :response_size, :int32, 2 - optional :payload, :message, 3, "grpc.testing.Payload" - end - add_message "grpc.testing.SimpleResponse" do - optional :payload, :message, 1, "grpc.testing.Payload" - optional :effective_gaia_user_id, :int64, 2 - end - add_message "grpc.testing.StreamingInputCallRequest" do - optional :payload, :message, 1, "grpc.testing.Payload" - end - add_message "grpc.testing.StreamingInputCallResponse" do - optional :aggregated_payload_size, :int32, 1 - end - add_message "grpc.testing.ResponseParameters" do - optional :size, :int32, 1 - optional :interval_us, :int32, 2 - end - add_message "grpc.testing.StreamingOutputCallRequest" do - optional :response_type, :enum, 1, "grpc.testing.PayloadType" - repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters" - optional :payload, :message, 3, "grpc.testing.Payload" - end - add_message "grpc.testing.StreamingOutputCallResponse" do - optional :payload, :message, 1, "grpc.testing.Payload" - end - add_enum "grpc.testing.PayloadType" do - value :COMPRESSABLE, 0 - value :UNCOMPRESSABLE, 1 - value :RANDOM, 2 - end -end - -module Grpc - module Testing - Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass - SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass - SimpleResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleResponse").msgclass - StreamingInputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallRequest").msgclass - StreamingInputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallResponse").msgclass - ResponseParameters = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ResponseParameters").msgclass - StreamingOutputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallRequest").msgclass - StreamingOutputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallResponse").msgclass - PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule - end -end diff --git a/src/ruby/bin/interop/test/cpp/interop/test.rb b/src/ruby/bin/interop/test/cpp/interop/test.rb deleted file mode 100644 index 0b391ed6af..0000000000 --- a/src/ruby/bin/interop/test/cpp/interop/test.rb +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright 2014, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/cpp/interop/test.proto - -require 'google/protobuf' - -require 'test/cpp/interop/empty' -require 'test/cpp/interop/messages' -Google::Protobuf::DescriptorPool.generated_pool.build do -end - -module Grpc - module Testing - end -end diff --git a/src/ruby/bin/interop/test/cpp/interop/test_services.rb b/src/ruby/bin/interop/test/cpp/interop/test_services.rb deleted file mode 100644 index 17b5461d3e..0000000000 --- a/src/ruby/bin/interop/test/cpp/interop/test_services.rb +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2014, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: test/cpp/interop/test.proto for package 'grpc.testing' - -require 'grpc' -require 'test/cpp/interop/test' - -module Grpc - module Testing - module TestService - - # TODO: add proto service documentation here - class Service - - include GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'grpc.testing.TestService' - - rpc :EmptyCall, Empty, Empty - rpc :UnaryCall, SimpleRequest, SimpleResponse - rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse) - rpc :StreamingInputCall, stream(StreamingInputCallRequest), StreamingInputCallResponse - rpc :FullDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) - rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) - end - - Stub = Service.rpc_stub_class - end - end -end diff --git a/src/ruby/bin/interop/third_party/stubby/testing/proto/messages.pb.rb b/src/ruby/bin/interop/third_party/stubby/testing/proto/messages.pb.rb new file mode 100755 index 0000000000..9a913f93e5 --- /dev/null +++ b/src/ruby/bin/interop/third_party/stubby/testing/proto/messages.pb.rb @@ -0,0 +1,94 @@ +## Generated from third_party/stubby/testing/proto/messages.proto for grpc.testing +require 'beefcake' + +require 'net/proto2/bridge/proto/message_set.pb' + +module Grpc + module Testing + + module PayloadType + COMPRESSABLE = 0 + UNCOMPRESSABLE = 1 + RANDOM = 2 + end + + class Payload + include Beefcake::Message + end + + class SimpleRequest + include Beefcake::Message + end + + class SimpleResponse + include Beefcake::Message + end + + class SimpleContext + include Beefcake::Message + end + + class StreamingInputCallRequest + include Beefcake::Message + end + + class StreamingInputCallResponse + include Beefcake::Message + end + + class ResponseParameters + include Beefcake::Message + end + + class StreamingOutputCallRequest + include Beefcake::Message + end + + class StreamingOutputCallResponse + include Beefcake::Message + end + + class Payload + optional :type, PayloadType, 1 + optional :body, :bytes, 2 + end + + class SimpleRequest + optional :response_type, PayloadType, 1 + optional :response_size, :int32, 2 + optional :payload, Payload, 3 + end + + class SimpleResponse + optional :payload, Payload, 1 + optional :effective_gaia_user_id, :int64, 2 + end + + class SimpleContext + optional :value, :string, 1 + end + + class StreamingInputCallRequest + optional :payload, Payload, 1 + end + + class StreamingInputCallResponse + optional :aggregated_payload_size, :int32, 1 + end + + class ResponseParameters + optional :size, :int32, 1 + optional :interval_us, :int32, 2 + end + + class StreamingOutputCallRequest + optional :response_type, PayloadType, 1 + repeated :response_parameters, ResponseParameters, 2 + optional :payload, Payload, 3 + end + + class StreamingOutputCallResponse + optional :payload, Payload, 1 + end + end +end diff --git a/src/ruby/bin/interop/third_party/stubby/testing/proto/test.pb.rb b/src/ruby/bin/interop/third_party/stubby/testing/proto/test.pb.rb new file mode 100755 index 0000000000..2e21460fe6 --- /dev/null +++ b/src/ruby/bin/interop/third_party/stubby/testing/proto/test.pb.rb @@ -0,0 +1,30 @@ +## Generated from third_party/stubby/testing/proto/test.proto for grpc.testing +require 'beefcake' +require 'grpc' + +require 'third_party/stubby/testing/proto/messages.pb' +require 'net/proto2/proto/empty.pb' + +module Grpc + module Testing + + module TestService + + class Service + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + + rpc :EmptyCall, Proto2::Empty, Proto2::Empty + rpc :UnaryCall, SimpleRequest, SimpleResponse + rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse) + rpc :StreamingInputCall, stream(StreamingInputCallRequest), StreamingInputCallResponse + rpc :FullDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) + rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) + end + Stub = Service.rpc_stub_class + + end + end +end diff --git a/src/ruby/bin/math.pb.rb b/src/ruby/bin/math.pb.rb new file mode 100755 index 0000000000..f6976be568 --- /dev/null +++ b/src/ruby/bin/math.pb.rb @@ -0,0 +1,65 @@ +## Generated from bin/math.proto for math +require "beefcake" +require "grpc" + +module Math + + class DivArgs + include Beefcake::Message + end + + class DivReply + include Beefcake::Message + end + + class FibArgs + include Beefcake::Message + end + + class Num + include Beefcake::Message + end + + class FibReply + include Beefcake::Message + end + + class DivArgs + required :dividend, :int64, 1 + required :divisor, :int64, 2 + end + + class DivReply + required :quotient, :int64, 1 + required :remainder, :int64, 2 + end + + class FibArgs + optional :limit, :int64, 1 + end + + class Num + required :num, :int64, 1 + end + + class FibReply + required :count, :int64, 1 + end + + module Math + + class Service + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + + rpc :Div, DivArgs, DivReply + rpc :DivMany, stream(DivArgs), stream(DivReply) + rpc :Fib, FibArgs, stream(Num) + rpc :Sum, stream(Num), Num + end + Stub = Service.rpc_stub_class + + end +end diff --git a/src/ruby/bin/math.proto b/src/ruby/bin/math.proto index c49787ad54..de18a50260 100755 --- a/src/ruby/bin/math.proto +++ b/src/ruby/bin/math.proto @@ -1,15 +1,15 @@ -syntax = "proto3"; +syntax = "proto2"; package math; message DivArgs { - optional int64 dividend = 1; - optional int64 divisor = 2; + required int64 dividend = 1; + required int64 divisor = 2; } message DivReply { - optional int64 quotient = 1; - optional int64 remainder = 2; + required int64 quotient = 1; + required int64 remainder = 2; } message FibArgs { @@ -17,11 +17,11 @@ message FibArgs { } message Num { - optional int64 num = 1; + required int64 num = 1; } message FibReply { - optional int64 count = 1; + required int64 count = 1; } service Math { diff --git a/src/ruby/bin/math.rb b/src/ruby/bin/math.rb deleted file mode 100644 index 09d1e98586..0000000000 --- a/src/ruby/bin/math.rb +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright 2014, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: math.proto - -require 'google/protobuf' - -Google::Protobuf::DescriptorPool.generated_pool.build do - add_message "math.DivArgs" do - optional :dividend, :int64, 1 - optional :divisor, :int64, 2 - end - add_message "math.DivReply" do - optional :quotient, :int64, 1 - optional :remainder, :int64, 2 - end - add_message "math.FibArgs" do - optional :limit, :int64, 1 - end - add_message "math.Num" do - optional :num, :int64, 1 - end - add_message "math.FibReply" do - optional :count, :int64, 1 - end -end - -module Math - DivArgs = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.DivArgs").msgclass - DivReply = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.DivReply").msgclass - FibArgs = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.FibArgs").msgclass - Num = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.Num").msgclass - FibReply = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.FibReply").msgclass -end diff --git a/src/ruby/bin/math_client.rb b/src/ruby/bin/math_client.rb index 5cba9317f4..8a62764c08 100644 --- a/src/ruby/bin/math_client.rb +++ b/src/ruby/bin/math_client.rb @@ -40,7 +40,7 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) require 'grpc' -require 'math_services' +require 'math.pb' require 'optparse' include GRPC::Core::TimeConsts @@ -111,8 +111,8 @@ def main 'secure' => false } OptionParser.new do |opts| - opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]' - opts.on('--host HOST', '<hostname>:<port>') do |v| + opts.banner = 'Usage: [--host|-h <hostname>:<port>] [--secure|-s]' + opts.on('-h', '--host', '<hostname>:<port>') do |v| options['host'] = v end opts.on('-s', '--secure', 'access using test creds') do |v| diff --git a/src/ruby/bin/math_server.rb b/src/ruby/bin/math_server.rb index a0f301c3e7..ed39144d7a 100644 --- a/src/ruby/bin/math_server.rb +++ b/src/ruby/bin/math_server.rb @@ -41,7 +41,7 @@ $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) require 'forwardable' require 'grpc' -require 'math_services' +require 'math.pb' require 'optparse' # Holds state for a fibonacci series @@ -168,8 +168,8 @@ def main 'secure' => false } OptionParser.new do |opts| - opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]' - opts.on('--host HOST', '<hostname>:<port>') do |v| + opts.banner = 'Usage: [--host|-h <hostname>:<port>] [--secure|-s]' + opts.on('-h', '--host', '<hostname>:<port>') do |v| options['host'] = v end opts.on('-s', '--secure', 'access using test creds') do |v| diff --git a/src/ruby/bin/math_services.rb b/src/ruby/bin/math_services.rb deleted file mode 100644 index f6ca6fe060..0000000000 --- a/src/ruby/bin/math_services.rb +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2014, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: math.proto for package 'math' - -require 'grpc' -require 'math' - -module Math - module Math - - # TODO: add proto service documentation here - class Service - - include GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'math.Math' - - rpc :Div, DivArgs, DivReply - rpc :DivMany, stream(DivArgs), stream(DivReply) - rpc :Fib, FibArgs, stream(Num) - rpc :Sum, stream(Num), Num - end - - Stub = Service.rpc_stub_class - end -end diff --git a/src/ruby/bin/noproto_client.rb b/src/ruby/bin/noproto_client.rb index 50ae9fb68f..29ed6d9f7c 100644 --- a/src/ruby/bin/noproto_client.rb +++ b/src/ruby/bin/noproto_client.rb @@ -37,68 +37,36 @@ lib_dir = File.join(File.dirname(this_dir), 'lib') $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) require 'grpc' -require 'optparse' -class NoProtoMsg - def self.marshal(o) +class EchoMsg + def marshal '' end def self.unmarshal(o) - NoProtoMsg.new + EchoMsg.new end end -class NoProtoService +class EchoService include GRPC::GenericService - rpc :AnRPC, NoProtoMsg, NoProtoMsg -end + rpc :AnRPC, EchoMsg, EchoMsg -NoProtoStub = NoProtoService.rpc_stub_class + def initialize(default_var='ignored') + end -def load_test_certs - this_dir = File.expand_path(File.dirname(__FILE__)) - data_dir = File.join(File.dirname(this_dir), 'spec/testdata') - files = ['ca.pem', 'server1.key', 'server1.pem'] - files.map { |f| File.open(File.join(data_dir, f)).read } + def an_rpc(req, call) + logger.info('echo service received a request') + req + end end -def test_creds - certs = load_test_certs - creds = GRPC::Core::Credentials.new(certs[0]) -end +EchoStub = EchoService.rpc_stub_class def main - options = { - 'host' => 'localhost:7071', - 'secure' => false - } - OptionParser.new do |opts| - opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]' - opts.on('--host HOST', '<hostname>:<port>') do |v| - options['host'] = v - end - opts.on('-s', '--secure', 'access using test creds') do |v| - options['secure'] = true - end - end.parse! - - if options['secure'] - stub_opts = { - :creds => test_creds, - GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com', - } - p stub_opts - p options['host'] - stub = NoProtoStub.new(options['host'], **stub_opts) - logger.info("... connecting securely on #{options['host']}") - else - stub = NoProtoStub.new(options['host']) - logger.info("... connecting insecurely on #{options['host']}") - end - - logger.info('sending a NoProto rpc') - resp = stub.an_rpc(NoProtoMsg.new) + stub = EchoStub.new('localhost:9090') + logger.info('sending an rpc') + resp = stub.an_rpc(EchoMsg.new) logger.info("got a response: #{resp}") end diff --git a/src/ruby/bin/noproto_server.rb b/src/ruby/bin/noproto_server.rb index d410827b22..7b74fa13ec 100644 --- a/src/ruby/bin/noproto_server.rb +++ b/src/ruby/bin/noproto_server.rb @@ -37,24 +37,23 @@ lib_dir = File.join(File.dirname(this_dir), 'lib') $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) require 'grpc' -require 'optparse' -class NoProtoMsg - def self.marshal(o) +class EchoMsg + def marshal '' end def self.unmarshal(o) - NoProtoMsg.new + EchoMsg.new end end -class NoProtoService +class EchoService include GRPC::GenericService - rpc :AnRPC, NoProtoMsg, NoProtoMsg + rpc :AnRPC, EchoMsg, EchoMsg end -class NoProto < NoProtoService +class Echo < EchoService def initialize(default_var='ignored') end @@ -64,46 +63,11 @@ class NoProto < NoProtoService end end -def load_test_certs - this_dir = File.expand_path(File.dirname(__FILE__)) - data_dir = File.join(File.dirname(this_dir), 'spec/testdata') - files = ['ca.pem', 'server1.key', 'server1.pem'] - files.map { |f| File.open(File.join(data_dir, f)).read } -end - -def test_server_creds - certs = load_test_certs - server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2]) -end - def main - options = { - 'host' => 'localhost:9090', - 'secure' => false - } - OptionParser.new do |opts| - opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]' - opts.on('--host HOST', '<hostname>:<port>') do |v| - options['host'] = v - end - opts.on('-s', '--secure', 'access using test creds') do |v| - options['secure'] = true - end - end.parse! - - if options['secure'] - s = GRPC::RpcServer.new(creds: test_server_creds) - s.add_http2_port(options['host'], true) - logger.info("... running securely on #{options['host']}") - else - s = GRPC::RpcServer.new - s.add_http2_port(options['host']) - logger.info("... running insecurely on #{options['host']}") - end - - s.handle(NoProto) + s = GRPC::RpcServer.new() + s.add_http2_port('localhost:9090') + s.handle(Echo) s.run end - main diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index 3e1dcd578b..b535de4946 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -19,7 +19,7 @@ Gem::Specification.new do |s| s.add_dependency 'xray' s.add_dependency 'logging', '~> 1.8' - s.add_dependency 'google-protobuf', '~> 3.0.0alpha' + s.add_dependency 'beefcake', '~> 1.1' s.add_dependency 'minitest', '~> 5.4' # not a dev dependency, used by the interop tests s.add_development_dependency "bundler", "~> 1.7" diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index 81c67ec859..c7eec3388c 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -27,6 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +require 'grpc/beefcake' # extends beefcake require 'grpc/errors' require 'grpc/grpc' require 'grpc/logconfig' diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb index f3fe638fce..6a4356fab9 100644 --- a/src/ruby/lib/grpc/generic/service.rb +++ b/src/ruby/lib/grpc/generic/service.rb @@ -93,8 +93,6 @@ module Google::RPC # The Dsl verifies that the types in the descriptor have both the # unmarshal and marshal methods. attr_writer(:marshal_class_method, :unmarshal_class_method) - - # This allows configuration of the service name. attr_accessor(:service_name) # Adds an RPC spec. @@ -119,8 +117,8 @@ module Google::RPC end def inherited(subclass) - # Each subclass should have a distinct class variable with its own - # rpc_descs + # Each subclass should have distinct class variable with its own + # rpc_descs. subclass.rpc_descs.merge!(rpc_descs) subclass.service_name = service_name end @@ -229,9 +227,8 @@ module Google::RPC def self.included(o) o.extend(Dsl) - # Update to the use the service name including module. Proivde a default - # that can be nil e,g. when modules are declared dynamically. - return unless o.service_name.nil? + # Update to the use the name including module. This can be nil e,g. when + # modules are declared dynamically. if o.name.nil? o.service_name = 'GenericService' else diff --git a/src/ruby/spec/generic/service_spec.rb b/src/ruby/spec/generic/service_spec.rb index a8e0c6f52f..dc921d8934 100644 --- a/src/ruby/spec/generic/service_spec.rb +++ b/src/ruby/spec/generic/service_spec.rb @@ -108,39 +108,6 @@ describe GenericService do expect(c.rpc_descs[:AnRpc]).to be_a(GRPC::RpcDesc) end - it 'adds a default service name' do - c = Class.new do - include GenericService - end - expect(c.service_name).to eq('GenericService') - end - - it 'adds a default service name to subclasses' do - base = Class.new do - include GenericService - end - c = Class.new(base) do - end - expect(c.service_name).to eq('GenericService') - end - - it 'adds the specified service name' do - c = Class.new do - include GenericService - self.service_name = 'test.service.TestService' - end - expect(c.service_name).to eq('test.service.TestService') - end - - it 'adds the specified service name to subclasses' do - base = Class.new do - include GenericService - self.service_name = 'test.service.TestService' - end - c = Class.new(base) do - end - expect(c.service_name).to eq('test.service.TestService') - end end describe '#include' do diff --git a/templates/Makefile.template b/templates/Makefile.template index 44144c8f7e..bdb9765114 100644 --- a/templates/Makefile.template +++ b/templates/Makefile.template @@ -100,7 +100,7 @@ CPPFLAGS += -g -fPIC -Wall -Werror -Wno-long-long LDFLAGS += -g -pthread -fPIC INCLUDES = . include gens -LIBS = rt m z pthread +LIBS = rt m z event event_pthreads pthread LIBSXX = protobuf LIBS_PROTOC = protoc protobuf @@ -159,12 +159,20 @@ else IS_GIT_FOLDER = true endif +EVENT2_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/event2.c -levent $(LDFLAGS) OPENSSL_ALPN_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/openssl-alpn.c -lssl -lcrypto -ldl $(LDFLAGS) ZLIB_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/zlib.c -lz $(LDFLAGS) +HAS_SYSTEM_EVENT2 = $(shell $(EVENT2_CHECK_CMD) 2> /dev/null && echo true || echo false) HAS_SYSTEM_OPENSSL_ALPN = $(shell $(OPENSSL_ALPN_CHECK_CMD) 2> /dev/null && echo true || echo false) HAS_SYSTEM_ZLIB = $(shell $(ZLIB_CHECK_CMD) 2> /dev/null && echo true || echo false) +ifeq ($(wildcard third_party/libevent/include/event2/event.h),) +HAS_EMBEDDED_EVENT2 = false +else +HAS_EMBEDDED_EVENT2 = true +endif + ifeq ($(wildcard third_party/openssl/ssl/ssl.h),) HAS_EMBEDDED_OPENSSL_ALPN = false else @@ -177,6 +185,12 @@ else HAS_EMBEDDED_ZLIB = true endif +ifneq ($(SYSTEM),MINGW32) +ifeq ($(HAS_SYSTEM_EVENT2),false) +DEP_MISSING += libevent +endif +endif + ifeq ($(HAS_SYSTEM_ZLIB),false) ifeq ($(HAS_EMBEDDED_ZLIB),true) ZLIB_DEP = third_party/zlib/libz.a @@ -277,6 +291,7 @@ ${tgt.name}: bins/$(CONFIG)/${tgt.name} % endfor run_dep_checks: + $(EVENT2_CHECK_CMD) || true $(OPENSSL_ALPN_CHECK_CMD) || true $(ZLIB_CHECK_CMD) || true diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index e5b7304743..1181f1b4de 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -371,12 +371,7 @@ void cq_verify_empty(cq_verifier *v) { GPR_ASSERT(v->expect.next == &v->expect && "expectation queue must be empty"); ev = grpc_completion_queue_next(v->cq, deadline); - if (ev != NULL) { - char *s = grpc_event_string(ev); - gpr_log(GPR_ERROR, "unexpected event (expected nothing): %s", s); - gpr_free(s); - abort(); - } + GPR_ASSERT(ev == NULL); } static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) { diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index cb5c6f7cad..b3fac796f4 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -120,7 +120,6 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, GPR_ASSERT(!f->server); f->server = grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); - grpc_server_start(f->server); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index 84acfa6d6c..061e049a09 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -120,7 +120,6 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, GPR_ASSERT(!f->server); f->server = grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); - grpc_server_start(f->server); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); } diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index bad86fb9dc..d15fef6adc 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -112,7 +112,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { gpr_slice request_payload_slice = large_slice(); grpc_byte_buffer *request_payload = grpc_byte_buffer_create(&request_payload_slice, 1); - gpr_timespec deadline = n_seconds_time(30); + gpr_timespec deadline = n_seconds_time(10); grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL); cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index a418d1b15f..08198d49fb 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -156,12 +156,9 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { grpc_call *c2; grpc_call *s1; grpc_call *s2; - int live_call; - grpc_call *live_call_obj; gpr_timespec deadline; cq_verifier *v_client; cq_verifier *v_server; - grpc_event *ev; server_arg.key = GRPC_ARG_MAX_CONCURRENT_STREAMS; server_arg.type = GRPC_ARG_INTEGER; @@ -183,10 +180,9 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { /* start two requests - ensuring that the second is not accepted until the first completes */ deadline = five_seconds_time(); - c1 = - grpc_channel_create_call(f.client, "/alpha", "test.google.com", deadline); + c1 = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c1); - c2 = grpc_channel_create_call(f.client, "/beta", "test.google.com", deadline); + c2 = grpc_channel_create_call(f.client, "/bar", "test.google.com", deadline); GPR_ASSERT(c1); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); @@ -195,29 +191,19 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { tag(301), tag(302), 0)); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c2, f.client_cq, tag(400), tag(401), tag(402), 0)); - ev = grpc_completion_queue_next( - f.client_cq, gpr_time_add(gpr_now(), gpr_time_from_seconds(10))); - GPR_ASSERT(ev); - GPR_ASSERT(ev->type == GRPC_INVOKE_ACCEPTED); - GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK); - /* The /alpha or /beta calls started above could be invoked (but NOT both); - * check this here */ - live_call = (int)(gpr_intptr)ev->tag; - live_call_obj = live_call == 300 ? c1 : c2; - grpc_event_finish(ev); + cq_expect_invoke_accepted(v_client, tag(300), GRPC_OP_OK); + cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_writes_done(live_call_obj, tag(live_call + 3))); - cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c1, tag(303))); + cq_expect_finish_accepted(v_client, tag(303), GRPC_OP_OK); cq_verify(v_client); - cq_expect_server_rpc_new(v_server, &s1, tag(100), - live_call == 300 ? "/alpha" : "/beta", - "test.google.com", deadline, NULL); + cq_expect_server_rpc_new(v_server, &s1, tag(100), "/foo", "test.google.com", + deadline, NULL); cq_verify(v_server); GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s1, f.server_cq, tag(102), 0)); - cq_expect_client_metadata_read(v_client, tag(live_call + 1), NULL); + cq_expect_client_metadata_read(v_client, tag(301), NULL); cq_verify(v_client); GPR_ASSERT(GRPC_CALL_OK == @@ -228,26 +214,22 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { cq_verify(v_server); /* first request is finished, we should be able to start the second */ - cq_expect_finished_with_status(v_client, tag(live_call + 2), - GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL); - live_call = (live_call == 300) ? 400 : 300; - live_call_obj = live_call == 300 ? c1 : c2; - cq_expect_invoke_accepted(v_client, tag(live_call), GRPC_OP_OK); + cq_expect_finished_with_status(v_client, tag(302), GRPC_STATUS_UNIMPLEMENTED, + "xyz", NULL); + cq_expect_invoke_accepted(v_client, tag(400), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_writes_done(live_call_obj, tag(live_call + 3))); - cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c2, tag(403))); + cq_expect_finish_accepted(v_client, tag(403), GRPC_OP_OK); cq_verify(v_client); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(200))); - cq_expect_server_rpc_new(v_server, &s2, tag(200), - live_call == 300 ? "/alpha" : "/beta", - "test.google.com", deadline, NULL); + cq_expect_server_rpc_new(v_server, &s2, tag(200), "/bar", "test.google.com", + deadline, NULL); cq_verify(v_server); GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s2, f.server_cq, tag(202), 0)); - cq_expect_client_metadata_read(v_client, tag(live_call + 1), NULL); + cq_expect_client_metadata_read(v_client, tag(401), NULL); cq_verify(v_client); GPR_ASSERT(GRPC_CALL_OK == @@ -257,8 +239,8 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { cq_expect_finished(v_server, tag(202), NULL); cq_verify(v_server); - cq_expect_finished_with_status(v_client, tag(live_call + 2), - GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL); + cq_expect_finished_with_status(v_client, tag(402), GRPC_STATUS_UNIMPLEMENTED, + "xyz", NULL); cq_verify(v_client); cq_verifier_destroy(v_client); diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c index 686d21d705..a8aa6126e6 100644 --- a/test/core/iomgr/alarm_list_test.c +++ b/test/core/iomgr/alarm_list_test.c @@ -41,13 +41,13 @@ #define MAX_CB 30 -static int cb_called[MAX_CB][2]; +static int cb_called[MAX_CB][GRPC_CALLBACK_DO_NOT_USE]; static int kicks; void grpc_kick_poller() { ++kicks; } -static void cb(void *arg, int success) { - cb_called[(gpr_intptr)arg][success]++; +static void cb(void *arg, grpc_iomgr_cb_status status) { + cb_called[(gpr_intptr)arg][status]++; } static void add_test() { @@ -72,36 +72,36 @@ static void add_test() { /* collect alarms. Only the first batch should be ready. */ GPR_ASSERT(10 == - grpc_alarm_check( - NULL, gpr_time_add(start, gpr_time_from_millis(500)), NULL)); + grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(500)))); for (i = 0; i < 20; i++) { - GPR_ASSERT(cb_called[i][1] == (i < 10)); - GPR_ASSERT(cb_called[i][0] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10)); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); } GPR_ASSERT(0 == - grpc_alarm_check( - NULL, gpr_time_add(start, gpr_time_from_millis(600)), NULL)); + grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(600)))); for (i = 0; i < 30; i++) { - GPR_ASSERT(cb_called[i][1] == (i < 10)); - GPR_ASSERT(cb_called[i][0] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10)); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); } /* collect the rest of the alarms */ GPR_ASSERT(10 == - grpc_alarm_check( - NULL, gpr_time_add(start, gpr_time_from_millis(1500)), NULL)); + grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1500)))); for (i = 0; i < 30; i++) { - GPR_ASSERT(cb_called[i][1] == (i < 20)); - GPR_ASSERT(cb_called[i][0] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20)); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); } GPR_ASSERT(0 == - grpc_alarm_check( - NULL, gpr_time_add(start, gpr_time_from_millis(1600)), NULL)); + grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1600)))); for (i = 0; i < 30; i++) { - GPR_ASSERT(cb_called[i][1] == (i < 20)); - GPR_ASSERT(cb_called[i][0] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20)); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); } grpc_alarm_list_shutdown(); @@ -124,16 +124,16 @@ void destruction_test() { (void *)(gpr_intptr)3, gpr_time_0); grpc_alarm_init(&alarms[4], gpr_time_from_millis(1), cb, (void *)(gpr_intptr)4, gpr_time_0); - GPR_ASSERT(1 == grpc_alarm_check(NULL, gpr_time_from_millis(2), NULL)); - GPR_ASSERT(1 == cb_called[4][1]); + GPR_ASSERT(1 == grpc_alarm_check(gpr_time_from_millis(2))); + GPR_ASSERT(1 == cb_called[4][GRPC_CALLBACK_SUCCESS]); grpc_alarm_cancel(&alarms[0]); grpc_alarm_cancel(&alarms[3]); - GPR_ASSERT(1 == cb_called[0][0]); - GPR_ASSERT(1 == cb_called[3][0]); + GPR_ASSERT(1 == cb_called[0][GRPC_CALLBACK_CANCELLED]); + GPR_ASSERT(1 == cb_called[3][GRPC_CALLBACK_CANCELLED]); grpc_alarm_list_shutdown(); - GPR_ASSERT(1 == cb_called[1][0]); - GPR_ASSERT(1 == cb_called[2][0]); + GPR_ASSERT(1 == cb_called[1][GRPC_CALLBACK_CANCELLED]); + GPR_ASSERT(1 == cb_called[2][GRPC_CALLBACK_CANCELLED]); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c index 247320de04..271c42d57e 100644 --- a/test/core/iomgr/alarm_test.c +++ b/test/core/iomgr/alarm_test.c @@ -51,10 +51,8 @@ #include <grpc/support/time.h> #include "test/core/util/test_config.h" -#define SUCCESS_NOT_SET (-1) - /* Dummy gRPC callback */ -void no_op_cb(void *arg, int success) {} +void no_op_cb(void *arg, grpc_iomgr_cb_status status) {} typedef struct { gpr_cv cv; @@ -64,25 +62,27 @@ typedef struct { int done_cancel_ctr; int done; gpr_event fcb_arg; - int success; + grpc_iomgr_cb_status status; } alarm_arg; -static void followup_cb(void *arg, int success) { +static void followup_cb(void *arg, grpc_iomgr_cb_status status) { gpr_event_set((gpr_event *)arg, arg); } /* Called when an alarm expires. */ -static void alarm_cb(void *arg /* alarm_arg */, int success) { +static void alarm_cb(void *arg /* alarm_arg */, grpc_iomgr_cb_status status) { alarm_arg *a = arg; gpr_mu_lock(&a->mu); - if (success) { + if (status == GRPC_CALLBACK_SUCCESS) { a->counter++; a->done_success_ctr++; - } else { + } else if (status == GRPC_CALLBACK_CANCELLED) { a->done_cancel_ctr++; + } else { + GPR_ASSERT(0); } a->done = 1; - a->success = success; + a->status = status; gpr_cv_signal(&a->cv); gpr_mu_unlock(&a->mu); grpc_iomgr_add_callback(followup_cb, &a->fcb_arg); @@ -105,7 +105,7 @@ static void test_grpc_alarm() { grpc_iomgr_init(); arg.counter = 0; - arg.success = SUCCESS_NOT_SET; + arg.status = GRPC_CALLBACK_DO_NOT_USE; arg.done_success_ctr = 0; arg.done_cancel_ctr = 0; arg.done = 0; @@ -138,7 +138,7 @@ static void test_grpc_alarm() { } else if (arg.done_cancel_ctr != 0) { gpr_log(GPR_ERROR, "Alarm done callback called with cancel"); GPR_ASSERT(0); - } else if (arg.success == SUCCESS_NOT_SET) { + } else if (arg.status == GRPC_CALLBACK_DO_NOT_USE) { gpr_log(GPR_ERROR, "Alarm callback without status"); GPR_ASSERT(0); } else { @@ -154,7 +154,7 @@ static void test_grpc_alarm() { gpr_mu_destroy(&arg.mu); arg2.counter = 0; - arg2.success = SUCCESS_NOT_SET; + arg2.status = GRPC_CALLBACK_DO_NOT_USE; arg2.done_success_ctr = 0; arg2.done_cancel_ctr = 0; arg2.done = 0; @@ -188,7 +188,7 @@ static void test_grpc_alarm() { } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) { gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times"); GPR_ASSERT(0); - } else if (arg2.success == SUCCESS_NOT_SET) { + } else if (arg2.status == GRPC_CALLBACK_DO_NOT_USE) { gpr_log(GPR_ERROR, "Alarm callback without status"); GPR_ASSERT(0); } else if (arg2.done_success_ctr) { diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 125cde4678..6a7f6afbc6 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -145,8 +145,8 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices, gpr_cv_signal(&state->cv); gpr_mu_unlock(&state->mu); } else { - grpc_endpoint_notify_on_read(state->read_ep, - read_and_write_test_read_handler, data); + grpc_endpoint_notify_on_read( + state->read_ep, read_and_write_test_read_handler, data, gpr_inf_future); } } @@ -159,8 +159,6 @@ static void read_and_write_test_write_handler(void *data, GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); - gpr_log(GPR_DEBUG, "%s: error=%d", __FUNCTION__, error); - if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { gpr_log(GPR_INFO, "Write handler shutdown"); gpr_mu_lock(&state->mu); @@ -184,10 +182,9 @@ static void read_and_write_test_write_handler(void *data, slices = allocate_blocks(state->current_write_size, 8192, &nslices, &state->current_write_data); - write_status = - grpc_endpoint_write(state->write_ep, slices, nslices, - read_and_write_test_write_handler, state); - gpr_log(GPR_DEBUG, "write_status=%d", write_status); + write_status = grpc_endpoint_write(state->write_ep, slices, nslices, + read_and_write_test_write_handler, state, + gpr_inf_future); GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR); free(slices); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { @@ -211,7 +208,8 @@ static void read_and_write_test(grpc_endpoint_test_config config, size_t num_bytes, size_t write_size, size_t slice_size, int shutdown) { struct read_and_write_test_state state; - gpr_timespec deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(20)); + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); if (shutdown) { @@ -243,22 +241,16 @@ static void read_and_write_test(grpc_endpoint_test_config config, read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK); grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler, - &state); + &state, gpr_inf_future); if (shutdown) { - gpr_log(GPR_DEBUG, "shutdown read"); grpc_endpoint_shutdown(state.read_ep); - gpr_log(GPR_DEBUG, "shutdown write"); grpc_endpoint_shutdown(state.write_ep); } gpr_mu_lock(&state.mu); while (!state.read_done || !state.write_done) { - if (gpr_cv_wait(&state.cv, &state.mu, deadline)) { - gpr_log(GPR_ERROR, "timeout: read_done=%d, write_done=%d", - state.read_done, state.write_done); - abort(); - } + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); } gpr_mu_unlock(&state.mu); @@ -273,6 +265,79 @@ struct timeout_test_state { gpr_event io_done; }; +static void read_timeout_test_read_handler(void *data, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error) { + struct timeout_test_state *state = data; + GPR_ASSERT(error == GRPC_ENDPOINT_CB_TIMED_OUT); + gpr_event_set(&state->io_done, (void *)1); +} + +static void read_timeout_test(grpc_endpoint_test_config config, + size_t slice_size) { + gpr_timespec timeout = gpr_time_from_micros(10000); + gpr_timespec read_deadline = gpr_time_add(gpr_now(), timeout); + gpr_timespec test_deadline = + gpr_time_add(gpr_now(), gpr_time_from_micros(2000000)); + struct timeout_test_state state; + grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); + + gpr_event_init(&state.io_done); + + grpc_endpoint_notify_on_read(f.client_ep, read_timeout_test_read_handler, + &state, read_deadline); + GPR_ASSERT(gpr_event_wait(&state.io_done, test_deadline)); + grpc_endpoint_destroy(f.client_ep); + grpc_endpoint_destroy(f.server_ep); + end_test(config); +} + +static void write_timeout_test_write_handler(void *data, + grpc_endpoint_cb_status error) { + struct timeout_test_state *state = data; + GPR_ASSERT(error == GRPC_ENDPOINT_CB_TIMED_OUT); + gpr_event_set(&state->io_done, (void *)1); +} + +static void write_timeout_test(grpc_endpoint_test_config config, + size_t slice_size) { + gpr_timespec timeout = gpr_time_from_micros(10000); + gpr_timespec write_deadline = gpr_time_add(gpr_now(), timeout); + gpr_timespec test_deadline = + gpr_time_add(gpr_now(), gpr_time_from_micros(2000000)); + struct timeout_test_state state; + int current_data = 1; + gpr_slice *slices; + size_t nblocks; + size_t size; + grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); + + gpr_event_init(&state.io_done); + + /* TODO(klempner): Factor this out with the equivalent code in tcp_test.c */ + for (size = 1;; size *= 2) { + slices = allocate_blocks(size, 1, &nblocks, ¤t_data); + switch (grpc_endpoint_write(f.client_ep, slices, nblocks, + write_timeout_test_write_handler, &state, + write_deadline)) { + case GRPC_ENDPOINT_WRITE_DONE: + break; + case GRPC_ENDPOINT_WRITE_ERROR: + gpr_log(GPR_ERROR, "error writing"); + abort(); + case GRPC_ENDPOINT_WRITE_PENDING: + GPR_ASSERT(gpr_event_wait(&state.io_done, test_deadline)); + gpr_free(slices); + goto exit; + } + gpr_free(slices); + } +exit: + grpc_endpoint_destroy(f.client_ep); + grpc_endpoint_destroy(f.server_ep); + end_test(config); +} + typedef struct { gpr_event ev; grpc_endpoint *ep; @@ -292,8 +357,9 @@ static void shutdown_during_write_test_read_handler( grpc_endpoint_destroy(st->ep); gpr_event_set(&st->ev, (void *)(gpr_intptr)error); } else { - grpc_endpoint_notify_on_read( - st->ep, shutdown_during_write_test_read_handler, user_data); + grpc_endpoint_notify_on_read(st->ep, + shutdown_during_write_test_read_handler, + user_data, gpr_inf_future); } } @@ -331,13 +397,14 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, gpr_event_init(&read_st.ev); gpr_event_init(&write_st.ev); - grpc_endpoint_notify_on_read( - read_st.ep, shutdown_during_write_test_read_handler, &read_st); + grpc_endpoint_notify_on_read(read_st.ep, + shutdown_during_write_test_read_handler, + &read_st, gpr_inf_future); for (size = 1;; size *= 2) { slices = allocate_blocks(size, 1, &nblocks, ¤t_data); switch (grpc_endpoint_write(write_st.ep, slices, nblocks, shutdown_during_write_test_write_handler, - &write_st)) { + &write_st, gpr_inf_future)) { case GRPC_ENDPOINT_WRITE_DONE: break; case GRPC_ENDPOINT_WRITE_ERROR: @@ -365,5 +432,7 @@ void grpc_endpoint_tests(grpc_endpoint_test_config config) { read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); + read_timeout_test(config, 1000); + write_timeout_test(config, 1000); shutdown_during_write_test(config, 1000); } diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 325c9f0221..c3a0afdb25 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -31,7 +31,8 @@ * */ -#include "src/core/iomgr/fd_posix.h" +/* Test gRPC event manager with a simple TCP upload server and client. */ +#include "src/core/iomgr/iomgr_libevent.h" #include <ctype.h> #include <errno.h> @@ -84,7 +85,7 @@ static void create_test_socket(int port, int *socket_fd, } /* Dummy gRPC callback */ -void no_op_cb(void *arg, int success) {} +void no_op_cb(void *arg, enum grpc_em_cb_status status) {} /* =======An upload server to test notify_on_read=========== The server simply reads and counts a stream of bytes. */ @@ -116,10 +117,10 @@ typedef struct { /* Called when an upload session can be safely shutdown. Close session FD and start to shutdown listen FD. */ static void session_shutdown_cb(void *arg, /*session*/ - int success) { + enum grpc_em_cb_status status) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(se->em_fd, NULL, NULL); + grpc_fd_destroy(se->em_fd, NULL, NULL); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(sv->em_fd); @@ -127,15 +128,15 @@ static void session_shutdown_cb(void *arg, /*session*/ /* Called when data become readable in a session. */ static void session_read_cb(void *arg, /*session*/ - int success) { + enum grpc_em_cb_status status) { session *se = arg; - int fd = se->em_fd->fd; + int fd = grpc_fd_get(se->em_fd); ssize_t read_once = 0; ssize_t read_total = 0; - if (!success) { - session_shutdown_cb(arg, 1); + if (status == GRPC_CALLBACK_CANCELLED) { + session_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); return; } @@ -150,7 +151,8 @@ static void session_read_cb(void *arg, /*session*/ It is possible to read nothing due to spurious edge event or data has been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ if (read_once == 0) { - session_shutdown_cb(arg, 1); + grpc_fd_shutdown(se->em_fd); + grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future); } else if (read_once == -1) { if (errno == EAGAIN) { /* An edge triggered event is cached in the kernel until next poll. @@ -161,7 +163,8 @@ static void session_read_cb(void *arg, /*session*/ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - grpc_fd_notify_on_read(se->em_fd, session_read_cb, se); + GPR_ASSERT(grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, + gpr_inf_future)); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); GPR_ASSERT(0); @@ -171,10 +174,11 @@ static void session_read_cb(void *arg, /*session*/ /* Called when the listen FD can be safely shutdown. Close listen FD and signal that server can be shutdown. */ -static void listen_shutdown_cb(void *arg /*server*/, int success) { +static void listen_shutdown_cb(void *arg /*server*/, + enum grpc_em_cb_status status) { server *sv = arg; - grpc_fd_orphan(sv->em_fd, NULL, NULL); + grpc_fd_destroy(sv->em_fd, NULL, NULL); gpr_mu_lock(&sv->mu); sv->done = 1; @@ -184,21 +188,21 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) { /* Called when a new TCP connection request arrives in the listening port. */ static void listen_cb(void *arg, /*=sv_arg*/ - int success) { + enum grpc_em_cb_status status) { server *sv = arg; int fd; int flags; session *se; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); - grpc_fd *listen_em_fd = sv->em_fd; + struct grpc_fd *listen_em_fd = sv->em_fd; - if (!success) { - listen_shutdown_cb(arg, 1); + if (status == GRPC_CALLBACK_CANCELLED) { + listen_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); return; } - fd = accept(listen_em_fd->fd, (struct sockaddr *)&ss, &slen); + fd = accept(grpc_fd_get(listen_em_fd), (struct sockaddr *)&ss, &slen); GPR_ASSERT(fd >= 0); GPR_ASSERT(fd < FD_SETSIZE); flags = fcntl(fd, F_GETFL, 0); @@ -206,9 +210,11 @@ static void listen_cb(void *arg, /*=sv_arg*/ se = gpr_malloc(sizeof(*se)); se->sv = sv; se->em_fd = grpc_fd_create(fd); - grpc_fd_notify_on_read(se->em_fd, session_read_cb, se); + GPR_ASSERT( + grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future)); - grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv); + GPR_ASSERT( + grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv, gpr_inf_future)); } /* Max number of connections pending to be accepted by listen(). */ @@ -233,7 +239,7 @@ static int server_start(server *sv) { sv->em_fd = grpc_fd_create(fd); /* Register to be interested in reading from listen_fd. */ - grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv); + GPR_ASSERT(grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv, gpr_inf_future)); return port; } @@ -279,24 +285,25 @@ static void client_init(client *cl) { } /* Called when a client upload session is ready to shutdown. */ -static void client_session_shutdown_cb(void *arg /*client*/, int success) { +static void client_session_shutdown_cb(void *arg /*client*/, + enum grpc_em_cb_status status) { client *cl = arg; - grpc_fd_orphan(cl->em_fd, NULL, NULL); + grpc_fd_destroy(cl->em_fd, NULL, NULL); + gpr_mu_lock(&cl->mu); cl->done = 1; gpr_cv_signal(&cl->done_cv); + gpr_mu_unlock(&cl->mu); } /* Write as much as possible, then register notify_on_write. */ static void client_session_write(void *arg, /*client*/ - int success) { + enum grpc_em_cb_status status) { client *cl = arg; - int fd = cl->em_fd->fd; + int fd = grpc_fd_get(cl->em_fd); ssize_t write_once = 0; - if (!success) { - gpr_mu_lock(&cl->mu); - client_session_shutdown_cb(arg, 1); - gpr_mu_unlock(&cl->mu); + if (status == GRPC_CALLBACK_CANCELLED) { + client_session_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); return; } @@ -308,10 +315,14 @@ static void client_session_write(void *arg, /*client*/ if (errno == EAGAIN) { gpr_mu_lock(&cl->mu); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { - grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl); + GPR_ASSERT(grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl, + gpr_inf_future)); cl->client_write_cnt++; } else { - client_session_shutdown_cb(arg, 1); + close(fd); + grpc_fd_shutdown(cl->em_fd); + grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl, + gpr_inf_future); } gpr_mu_unlock(&cl->mu); } else { @@ -333,7 +344,7 @@ static void client_start(client *cl, int port) { cl->em_fd = grpc_fd_create(fd); - client_session_write(cl, 1); + client_session_write(cl, GRPC_CALLBACK_SUCCESS); } /* Wait for the signal to shutdown a client. */ @@ -367,7 +378,7 @@ static void test_grpc_fd() { typedef struct fd_change_data { gpr_mu mu; gpr_cv cv; - void (*cb_that_ran)(void *, int success); + void (*cb_that_ran)(void *, enum grpc_em_cb_status); } fd_change_data; void init_change_data(fd_change_data *fdc) { @@ -381,7 +392,8 @@ void destroy_change_data(fd_change_data *fdc) { gpr_cv_destroy(&fdc->cv); } -static void first_read_callback(void *arg /* fd_change_data */, int success) { +static void first_read_callback(void *arg /* fd_change_data */, + enum grpc_em_cb_status status) { fd_change_data *fdc = arg; gpr_mu_lock(&fdc->mu); @@ -390,7 +402,8 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) { gpr_mu_unlock(&fdc->mu); } -static void second_read_callback(void *arg /* fd_change_data */, int success) { +static void second_read_callback(void *arg /* fd_change_data */, + enum grpc_em_cb_status status) { fd_change_data *fdc = arg; gpr_mu_lock(&fdc->mu); @@ -423,7 +436,7 @@ static void test_grpc_fd_change() { em_fd = grpc_fd_create(sv[0]); /* Register the first callback, then make its FD readable */ - grpc_fd_notify_on_read(em_fd, first_read_callback, &a); + grpc_fd_notify_on_read(em_fd, first_read_callback, &a, gpr_inf_future); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -442,7 +455,7 @@ static void test_grpc_fd_change() { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_fd_notify_on_read(em_fd, second_read_callback, &b); + grpc_fd_notify_on_read(em_fd, second_read_callback, &b, gpr_inf_future); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -455,9 +468,48 @@ static void test_grpc_fd_change() { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(&b.mu); - grpc_fd_orphan(em_fd, NULL, NULL); + grpc_fd_destroy(em_fd, NULL, NULL); destroy_change_data(&a); destroy_change_data(&b); + close(sv[0]); + close(sv[1]); +} + +void timeout_callback(void *arg, enum grpc_em_cb_status status) { + if (status == GRPC_CALLBACK_TIMED_OUT) { + gpr_event_set(arg, (void *)1); + } else { + gpr_event_set(arg, (void *)2); + } +} + +void test_grpc_fd_notify_timeout() { + grpc_fd *em_fd; + gpr_event ev; + int flags; + int sv[2]; + gpr_timespec timeout; + gpr_timespec deadline; + + gpr_event_init(&ev); + + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); + + em_fd = grpc_fd_create(sv[0]); + + timeout = gpr_time_from_micros(1000000); + deadline = gpr_time_add(gpr_now(), timeout); + + grpc_fd_notify_on_read(em_fd, timeout_callback, &ev, deadline); + + GPR_ASSERT(gpr_event_wait(&ev, gpr_time_add(deadline, timeout))); + + GPR_ASSERT(gpr_event_get(&ev) == (void *)1); + grpc_fd_destroy(em_fd, NULL, NULL); close(sv[1]); } @@ -466,6 +518,7 @@ int main(int argc, char **argv) { grpc_iomgr_init(); test_grpc_fd(); test_grpc_fd_change(); + test_grpc_fd_notify_timeout(); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 2d0a89a1f5..cb1cd0bc16 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -44,7 +44,7 @@ #include <grpc/support/time.h> static gpr_timespec test_deadline() { - return gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); + return gpr_time_add(gpr_now(), gpr_time_from_micros(1000000)); } static void must_succeed(void *arg, grpc_endpoint *tcp) { diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 7fd2567cec..6a4ef0f984 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -154,7 +154,7 @@ static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, if (state->read_bytes >= state->target_read_bytes) { gpr_cv_signal(&state->cv); } else { - grpc_endpoint_notify_on_read(state->ep, read_cb, state); + grpc_endpoint_notify_on_read(state->ep, read_cb, state, gpr_inf_future); } gpr_mu_unlock(&state->mu); } @@ -183,7 +183,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { state.read_bytes = 0; state.target_read_bytes = written_bytes; - grpc_endpoint_notify_on_read(ep, read_cb, &state); + grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future); gpr_mu_lock(&state.mu); for (;;) { @@ -225,7 +225,7 @@ static void large_read_test(ssize_t slice_size) { state.read_bytes = 0; state.target_read_bytes = written_bytes; - grpc_endpoint_notify_on_read(ep, read_cb, &state); + grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future); gpr_mu_lock(&state.mu); for (;;) { @@ -363,8 +363,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) == - GRPC_ENDPOINT_WRITE_DONE) { + if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state, + gpr_inf_future) == GRPC_ENDPOINT_WRITE_DONE) { /* Write completed immediately */ read_bytes = drain_socket(sv[0]); GPR_ASSERT(read_bytes == num_bytes); @@ -421,13 +421,15 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) { + switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state, + gpr_inf_future)) { case GRPC_ENDPOINT_WRITE_DONE: case GRPC_ENDPOINT_WRITE_ERROR: /* Write completed immediately */ break; case GRPC_ENDPOINT_WRITE_PENDING: - grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL); + grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL, + gpr_inf_future); gpr_mu_lock(&state.mu); for (;;) { if (state.write_done) { diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index f30ff917cb..cb77a88062 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -66,7 +66,7 @@ static void test_no_op() { static void test_no_op_with_start() { grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST(); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, on_connect, NULL); grpc_tcp_server_destroy(s); } @@ -93,7 +93,7 @@ static void test_no_op_with_port_and_start() { GPR_ASSERT( grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, on_connect, NULL); grpc_tcp_server_destroy(s); } @@ -120,7 +120,7 @@ static void test_connect(int n) { GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); GPR_ASSERT(addr_len <= sizeof(addr)); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, on_connect, NULL); for (i = 0; i < n; i++) { deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000)); diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index d4baa64725..9311d6ba11 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -153,7 +153,8 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { int verified = 0; gpr_log(GPR_INFO, "Start test left over"); - grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified); + grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified, + gpr_inf_future); GPR_ASSERT(verified == 1); grpc_endpoint_shutdown(f.client_ep); @@ -186,7 +187,7 @@ static void test_destroy_ep_early(grpc_endpoint_test_config config, grpc_endpoint_test_fixture f = config.create_fixture(slice_size); gpr_log(GPR_INFO, "Start test destroy early"); - grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f); + grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f, gpr_inf_future); grpc_endpoint_shutdown(f.server_ep); grpc_endpoint_destroy(f.server_ep); diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj index 6cc5a8b293..ffce5f6ecd 100644 --- a/vsprojects/vs2013/grpc.vcxproj +++ b/vsprojects/vs2013/grpc.vcxproj @@ -115,12 +115,10 @@ <ClInclude Include="..\..\src\core\iomgr\alarm_internal.h" /> <ClInclude Include="..\..\src\core\iomgr\endpoint.h" /> <ClInclude Include="..\..\src\core\iomgr\endpoint_pair.h" /> - <ClInclude Include="..\..\src\core\iomgr\fd_posix.h" /> + <ClInclude Include="..\..\src\core\iomgr\iomgr_completion_queue_interface.h" /> <ClInclude Include="..\..\src\core\iomgr\iomgr.h" /> - <ClInclude Include="..\..\src\core\iomgr\iomgr_internal.h" /> - <ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" /> + <ClInclude Include="..\..\src\core\iomgr\iomgr_libevent.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset.h" /> - <ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\resolve_address.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr_posix.h" /> @@ -145,9 +143,9 @@ <ClInclude Include="..\..\src\core\surface\server.h" /> <ClInclude Include="..\..\src\core\surface\surface_trace.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h" /> - <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_goaway.h" /> + <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_ping.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_rst_stream.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_settings.h" /> @@ -160,8 +158,8 @@ <ClInclude Include="..\..\src\core\transport\chttp2\stream_encoder.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\stream_map.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\timeout_encoding.h" /> - <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" /> <ClInclude Include="..\..\src\core\transport\chttp2_transport.h" /> + <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" /> <ClInclude Include="..\..\src\core\transport\metadata.h" /> <ClInclude Include="..\..\src\core\transport\stream_op.h" /> <ClInclude Include="..\..\src\core\transport\transport.h" /> @@ -238,15 +236,11 @@ </ClCompile> <ClCompile Include="..\..\src\core\iomgr\endpoint_pair_posix.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\fd_posix.c"> - </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\iomgr.c"> + <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c"> + <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent_use_threads.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> - </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset_posix.c"> + <ClCompile Include="..\..\src\core\iomgr\pollset.c"> </ClCompile> <ClCompile Include="..\..\src\core\iomgr\resolve_address_posix.c"> </ClCompile> @@ -338,10 +332,10 @@ </ClCompile> <ClCompile Include="..\..\src\core\transport\chttp2\timeout_encoding.c"> </ClCompile> - <ClCompile Include="..\..\src\core\transport\chttp2\varint.c"> - </ClCompile> <ClCompile Include="..\..\src\core\transport\chttp2_transport.c"> </ClCompile> + <ClCompile Include="..\..\src\core\transport\chttp2\varint.c"> + </ClCompile> <ClCompile Include="..\..\src\core\transport\metadata.c"> </ClCompile> <ClCompile Include="..\..\src\core\transport\stream_op.c"> diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj index 6cc5a8b293..ffce5f6ecd 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj @@ -115,12 +115,10 @@ <ClInclude Include="..\..\src\core\iomgr\alarm_internal.h" /> <ClInclude Include="..\..\src\core\iomgr\endpoint.h" /> <ClInclude Include="..\..\src\core\iomgr\endpoint_pair.h" /> - <ClInclude Include="..\..\src\core\iomgr\fd_posix.h" /> + <ClInclude Include="..\..\src\core\iomgr\iomgr_completion_queue_interface.h" /> <ClInclude Include="..\..\src\core\iomgr\iomgr.h" /> - <ClInclude Include="..\..\src\core\iomgr\iomgr_internal.h" /> - <ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" /> + <ClInclude Include="..\..\src\core\iomgr\iomgr_libevent.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset.h" /> - <ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\resolve_address.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr_posix.h" /> @@ -145,9 +143,9 @@ <ClInclude Include="..\..\src\core\surface\server.h" /> <ClInclude Include="..\..\src\core\surface\surface_trace.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h" /> - <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_goaway.h" /> + <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_ping.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_rst_stream.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\frame_settings.h" /> @@ -160,8 +158,8 @@ <ClInclude Include="..\..\src\core\transport\chttp2\stream_encoder.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\stream_map.h" /> <ClInclude Include="..\..\src\core\transport\chttp2\timeout_encoding.h" /> - <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" /> <ClInclude Include="..\..\src\core\transport\chttp2_transport.h" /> + <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" /> <ClInclude Include="..\..\src\core\transport\metadata.h" /> <ClInclude Include="..\..\src\core\transport\stream_op.h" /> <ClInclude Include="..\..\src\core\transport\transport.h" /> @@ -238,15 +236,11 @@ </ClCompile> <ClCompile Include="..\..\src\core\iomgr\endpoint_pair_posix.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\fd_posix.c"> - </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\iomgr.c"> + <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c"> + <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent_use_threads.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> - </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset_posix.c"> + <ClCompile Include="..\..\src\core\iomgr\pollset.c"> </ClCompile> <ClCompile Include="..\..\src\core\iomgr\resolve_address_posix.c"> </ClCompile> @@ -338,10 +332,10 @@ </ClCompile> <ClCompile Include="..\..\src\core\transport\chttp2\timeout_encoding.c"> </ClCompile> - <ClCompile Include="..\..\src\core\transport\chttp2\varint.c"> - </ClCompile> <ClCompile Include="..\..\src\core\transport\chttp2_transport.c"> </ClCompile> + <ClCompile Include="..\..\src\core\transport\chttp2\varint.c"> + </ClCompile> <ClCompile Include="..\..\src\core\transport\metadata.c"> </ClCompile> <ClCompile Include="..\..\src\core\transport\stream_op.c"> |