aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2015-01-07 12:13:17 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2015-01-09 17:23:18 -0800
commite4b409364e4c493a66d4b2a6fe897075aa5c174e (patch)
tree29467626f50aea49e072e15004dd141625146709
parent8232204a36712553b9eedb2dacab13b7c38642c6 (diff)
Add a --forever flag, to continuously run tests as things change.
Change on 2015/01/07 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83451760
-rw-r--r--Makefile37
-rw-r--r--build.json20
-rw-r--r--include/grpc/support/port_platform.h3
-rw-r--r--include/grpc/support/time.h2
-rw-r--r--src/compiler/ruby_generator.cc5
-rw-r--r--src/core/channel/child_channel.c36
-rw-r--r--src/core/channel/child_channel.h3
-rw-r--r--src/core/channel/client_channel.c20
-rw-r--r--src/core/channel/client_setup.c5
-rw-r--r--src/core/httpcli/httpcli.c10
-rw-r--r--src/core/iomgr/alarm.c34
-rw-r--r--src/core/iomgr/alarm_internal.h5
-rw-r--r--src/core/iomgr/endpoint.c14
-rw-r--r--src/core/iomgr/endpoint.h17
-rw-r--r--src/core/iomgr/fd_posix.c274
-rw-r--r--src/core/iomgr/fd_posix.h138
-rw-r--r--src/core/iomgr/iomgr.c204
-rw-r--r--src/core/iomgr/iomgr.h11
-rw-r--r--src/core/iomgr/iomgr_completion_queue_interface.h (renamed from src/core/iomgr/iomgr_posix.h)15
-rw-r--r--src/core/iomgr/iomgr_libevent.c652
-rw-r--r--src/core/iomgr/iomgr_libevent.h206
-rw-r--r--src/core/iomgr/iomgr_libevent_use_threads.c (renamed from src/core/iomgr/iomgr_internal.h)35
-rw-r--r--src/core/iomgr/pollset.c (renamed from src/core/iomgr/iomgr_posix.c)7
-rw-r--r--src/core/iomgr/pollset.h21
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c239
-rw-r--r--src/core/iomgr/pollset_posix.c342
-rw-r--r--src/core/iomgr/pollset_posix.h95
-rw-r--r--src/core/iomgr/resolve_address_posix.c6
-rw-r--r--src/core/iomgr/tcp_client_posix.c76
-rw-r--r--src/core/iomgr/tcp_posix.c67
-rw-r--r--src/core/iomgr/tcp_posix.h2
-rw-r--r--src/core/iomgr/tcp_server.h4
-rw-r--r--src/core/iomgr/tcp_server_posix.c38
-rw-r--r--src/core/security/credentials.c5
-rw-r--r--src/core/security/secure_endpoint.c15
-rw-r--r--src/core/security/secure_transport_setup.c12
-rw-r--r--src/core/security/server_secure_chttp2.c4
-rw-r--r--src/core/support/time.c15
-rw-r--r--src/core/surface/call.c4
-rw-r--r--src/core/surface/completion_queue.c66
-rw-r--r--src/core/surface/server.c13
-rw-r--r--src/core/surface/server.h3
-rw-r--r--src/core/surface/server_chttp2.c4
-rw-r--r--src/core/transport/chttp2_transport.c5
-rwxr-xr-xsrc/ruby/Gemfile8
-rwxr-xr-xsrc/ruby/README.md46
-rw-r--r--src/ruby/bin/interop/interop_client.rb25
-rw-r--r--src/ruby/bin/interop/interop_server.rb9
-rwxr-xr-xsrc/ruby/bin/interop/net/proto2/bridge/proto/message_set.pb.rb14
-rwxr-xr-xsrc/ruby/bin/interop/net/proto2/proto/empty.pb.rb12
-rw-r--r--src/ruby/bin/interop/test/cpp/interop/empty.rb44
-rw-r--r--src/ruby/bin/interop/test/cpp/interop/messages.rb86
-rw-r--r--src/ruby/bin/interop/test/cpp/interop/test.rb43
-rw-r--r--src/ruby/bin/interop/test/cpp/interop/test_services.rb60
-rwxr-xr-xsrc/ruby/bin/interop/third_party/stubby/testing/proto/messages.pb.rb94
-rwxr-xr-xsrc/ruby/bin/interop/third_party/stubby/testing/proto/test.pb.rb30
-rwxr-xr-xsrc/ruby/bin/math.pb.rb65
-rwxr-xr-xsrc/ruby/bin/math.proto14
-rw-r--r--src/ruby/bin/math.rb61
-rw-r--r--src/ruby/bin/math_client.rb6
-rw-r--r--src/ruby/bin/math_server.rb6
-rw-r--r--src/ruby/bin/math_services.rb56
-rw-r--r--src/ruby/bin/noproto_client.rb62
-rw-r--r--src/ruby/bin/noproto_server.rb54
-rwxr-xr-xsrc/ruby/grpc.gemspec2
-rw-r--r--src/ruby/lib/grpc.rb1
-rw-r--r--src/ruby/lib/grpc/generic/service.rb11
-rw-r--r--src/ruby/spec/generic/service_spec.rb33
-rw-r--r--templates/Makefile.template17
-rw-r--r--test/core/end2end/cq_verifier.c7
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair.c1
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c1
-rw-r--r--test/core/end2end/tests/invoke_large_request.c2
-rw-r--r--test/core/end2end/tests/max_concurrent_streams.c56
-rw-r--r--test/core/iomgr/alarm_list_test.c50
-rw-r--r--test/core/iomgr/alarm_test.c26
-rw-r--r--test/core/iomgr/endpoint_tests.c113
-rw-r--r--test/core/iomgr/fd_posix_test.c127
-rw-r--r--test/core/iomgr/tcp_client_posix_test.c2
-rw-r--r--test/core/iomgr/tcp_posix_test.c16
-rw-r--r--test/core/iomgr/tcp_server_posix_test.c6
-rw-r--r--test/core/security/secure_endpoint_test.c5
-rw-r--r--vsprojects/vs2013/grpc.vcxproj24
-rw-r--r--vsprojects/vs2013/grpc_unsecure.vcxproj24
84 files changed, 1775 insertions, 2333 deletions
diff --git a/Makefile b/Makefile
index 829241b9d5..49b0bd629d 100644
--- a/Makefile
+++ b/Makefile
@@ -84,7 +84,7 @@ CPPFLAGS += -g -fPIC -Wall -Werror -Wno-long-long
LDFLAGS += -g -pthread -fPIC
INCLUDES = . include gens
-LIBS = rt m z pthread
+LIBS = rt m z event event_pthreads pthread
LIBSXX = protobuf
LIBS_PROTOC = protoc protobuf
@@ -143,12 +143,20 @@ else
IS_GIT_FOLDER = true
endif
+EVENT2_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/event2.c -levent $(LDFLAGS)
OPENSSL_ALPN_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/openssl-alpn.c -lssl -lcrypto -ldl $(LDFLAGS)
ZLIB_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/zlib.c -lz $(LDFLAGS)
+HAS_SYSTEM_EVENT2 = $(shell $(EVENT2_CHECK_CMD) 2> /dev/null && echo true || echo false)
HAS_SYSTEM_OPENSSL_ALPN = $(shell $(OPENSSL_ALPN_CHECK_CMD) 2> /dev/null && echo true || echo false)
HAS_SYSTEM_ZLIB = $(shell $(ZLIB_CHECK_CMD) 2> /dev/null && echo true || echo false)
+ifeq ($(wildcard third_party/libevent/include/event2/event.h),)
+HAS_EMBEDDED_EVENT2 = false
+else
+HAS_EMBEDDED_EVENT2 = true
+endif
+
ifeq ($(wildcard third_party/openssl/ssl/ssl.h),)
HAS_EMBEDDED_OPENSSL_ALPN = false
else
@@ -161,6 +169,12 @@ else
HAS_EMBEDDED_ZLIB = true
endif
+ifneq ($(SYSTEM),MINGW32)
+ifeq ($(HAS_SYSTEM_EVENT2),false)
+DEP_MISSING += libevent
+endif
+endif
+
ifeq ($(HAS_SYSTEM_ZLIB),false)
ifeq ($(HAS_EMBEDDED_ZLIB),true)
ZLIB_DEP = third_party/zlib/libz.a
@@ -452,6 +466,7 @@ chttp2_socket_pair_one_byte_at_a_time_thread_stress_test: bins/$(CONFIG)/chttp2_
chttp2_socket_pair_one_byte_at_a_time_writes_done_hangs_with_pending_read_test: bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_writes_done_hangs_with_pending_read_test
run_dep_checks:
+ $(EVENT2_CHECK_CMD) || true
$(OPENSSL_ALPN_CHECK_CMD) || true
$(ZLIB_CHECK_CMD) || true
@@ -1214,11 +1229,9 @@ LIBGRPC_SRC = \
src/core/iomgr/alarm_heap.c \
src/core/iomgr/endpoint.c \
src/core/iomgr/endpoint_pair_posix.c \
- src/core/iomgr/fd_posix.c \
- src/core/iomgr/iomgr.c \
- src/core/iomgr/iomgr_posix.c \
- src/core/iomgr/pollset_multipoller_with_poll_posix.c \
- src/core/iomgr/pollset_posix.c \
+ src/core/iomgr/iomgr_libevent.c \
+ src/core/iomgr/iomgr_libevent_use_threads.c \
+ src/core/iomgr/pollset.c \
src/core/iomgr/resolve_address_posix.c \
src/core/iomgr/sockaddr_utils.c \
src/core/iomgr/socket_utils_common_posix.c \
@@ -1264,8 +1277,8 @@ LIBGRPC_SRC = \
src/core/transport/chttp2/stream_encoder.c \
src/core/transport/chttp2/stream_map.c \
src/core/transport/chttp2/timeout_encoding.c \
- src/core/transport/chttp2/varint.c \
src/core/transport/chttp2_transport.c \
+ src/core/transport/chttp2/varint.c \
src/core/transport/metadata.c \
src/core/transport/stream_op.c \
src/core/transport/transport.c \
@@ -1366,11 +1379,9 @@ LIBGRPC_UNSECURE_SRC = \
src/core/iomgr/alarm_heap.c \
src/core/iomgr/endpoint.c \
src/core/iomgr/endpoint_pair_posix.c \
- src/core/iomgr/fd_posix.c \
- src/core/iomgr/iomgr.c \
- src/core/iomgr/iomgr_posix.c \
- src/core/iomgr/pollset_multipoller_with_poll_posix.c \
- src/core/iomgr/pollset_posix.c \
+ src/core/iomgr/iomgr_libevent.c \
+ src/core/iomgr/iomgr_libevent_use_threads.c \
+ src/core/iomgr/pollset.c \
src/core/iomgr/resolve_address_posix.c \
src/core/iomgr/sockaddr_utils.c \
src/core/iomgr/socket_utils_common_posix.c \
@@ -1416,8 +1427,8 @@ LIBGRPC_UNSECURE_SRC = \
src/core/transport/chttp2/stream_encoder.c \
src/core/transport/chttp2/stream_map.c \
src/core/transport/chttp2/timeout_encoding.c \
- src/core/transport/chttp2/varint.c \
src/core/transport/chttp2_transport.c \
+ src/core/transport/chttp2/varint.c \
src/core/transport/metadata.c \
src/core/transport/stream_op.c \
src/core/transport/transport.c \
diff --git a/build.json b/build.json
index e0e05c8eaf..e0bf280635 100644
--- a/build.json
+++ b/build.json
@@ -35,11 +35,9 @@
"src/core/iomgr/alarm_heap.c",
"src/core/iomgr/endpoint.c",
"src/core/iomgr/endpoint_pair_posix.c",
- "src/core/iomgr/fd_posix.c",
- "src/core/iomgr/iomgr.c",
- "src/core/iomgr/iomgr_posix.c",
- "src/core/iomgr/pollset_multipoller_with_poll_posix.c",
- "src/core/iomgr/pollset_posix.c",
+ "src/core/iomgr/iomgr_libevent.c",
+ "src/core/iomgr/iomgr_libevent_use_threads.c",
+ "src/core/iomgr/pollset.c",
"src/core/iomgr/resolve_address_posix.c",
"src/core/iomgr/sockaddr_utils.c",
"src/core/iomgr/socket_utils_common_posix.c",
@@ -85,8 +83,8 @@
"src/core/transport/chttp2/stream_encoder.c",
"src/core/transport/chttp2/stream_map.c",
"src/core/transport/chttp2/timeout_encoding.c",
- "src/core/transport/chttp2/varint.c",
"src/core/transport/chttp2_transport.c",
+ "src/core/transport/chttp2/varint.c",
"src/core/transport/metadata.c",
"src/core/transport/stream_op.c",
"src/core/transport/transport.c",
@@ -122,12 +120,10 @@
"src/core/iomgr/alarm_internal.h",
"src/core/iomgr/endpoint.h",
"src/core/iomgr/endpoint_pair.h",
- "src/core/iomgr/fd_posix.h",
+ "src/core/iomgr/iomgr_completion_queue_interface.h",
"src/core/iomgr/iomgr.h",
- "src/core/iomgr/iomgr_internal.h",
- "src/core/iomgr/iomgr_posix.h",
+ "src/core/iomgr/iomgr_libevent.h",
"src/core/iomgr/pollset.h",
- "src/core/iomgr/pollset_posix.h",
"src/core/iomgr/resolve_address.h",
"src/core/iomgr/sockaddr.h",
"src/core/iomgr/sockaddr_posix.h",
@@ -152,9 +148,9 @@
"src/core/surface/server.h",
"src/core/surface/surface_trace.h",
"src/core/transport/chttp2/bin_encoder.h",
- "src/core/transport/chttp2/frame.h",
"src/core/transport/chttp2/frame_data.h",
"src/core/transport/chttp2/frame_goaway.h",
+ "src/core/transport/chttp2/frame.h",
"src/core/transport/chttp2/frame_ping.h",
"src/core/transport/chttp2/frame_rst_stream.h",
"src/core/transport/chttp2/frame_settings.h",
@@ -167,8 +163,8 @@
"src/core/transport/chttp2/stream_encoder.h",
"src/core/transport/chttp2/stream_map.h",
"src/core/transport/chttp2/timeout_encoding.h",
- "src/core/transport/chttp2/varint.h",
"src/core/transport/chttp2_transport.h",
+ "src/core/transport/chttp2/varint.h",
"src/core/transport/metadata.h",
"src/core/transport/stream_op.h",
"src/core/transport/transport.h",
diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h
index 27a7b5529f..9e5c9ff2ac 100644
--- a/include/grpc/support/port_platform.h
+++ b/include/grpc/support/port_platform.h
@@ -54,7 +54,6 @@
#define GPR_CPU_LINUX 1
#define GPR_GCC_SYNC 1
#define GPR_LIBEVENT 1
-#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_SOCKETUTILS 1
@@ -66,7 +65,6 @@
#define GPR_GCC_ATOMIC 1
#define GPR_LIBEVENT 1
#define GPR_LINUX 1
-#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_STRING 1
@@ -82,7 +80,6 @@
#define GPR_GCC_ATOMIC 1
#define GPR_LIBEVENT 1
#define GPR_POSIX_LOG 1
-#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_SOCKETUTILS 1
diff --git a/include/grpc/support/time.h b/include/grpc/support/time.h
index 41d1e88dc9..5a57d94768 100644
--- a/include/grpc/support/time.h
+++ b/include/grpc/support/time.h
@@ -94,8 +94,6 @@ gpr_timespec gpr_time_from_seconds(long x);
gpr_timespec gpr_time_from_minutes(long x);
gpr_timespec gpr_time_from_hours(long x);
-gpr_int32 gpr_time_to_millis(gpr_timespec timespec);
-
/* Return 1 if two times are equal or within threshold of each other,
0 otherwise */
int gpr_time_similar(gpr_timespec a, gpr_timespec b, gpr_timespec threshold);
diff --git a/src/compiler/ruby_generator.cc b/src/compiler/ruby_generator.cc
index 20485b47a5..c5c5884736 100644
--- a/src/compiler/ruby_generator.cc
+++ b/src/compiler/ruby_generator.cc
@@ -104,11 +104,6 @@ void PrintService(const ServiceDescriptor* service, const string& package,
out->Print("\n");
out->Print("self.marshal_class_method = :encode\n");
out->Print("self.unmarshal_class_method = :decode\n");
- map<string, string> pkg_vars = ListToDict({
- "service.name", service->name(),
- "pkg.name", package,
- });
- out->Print(pkg_vars, "self.service_name = '$pkg.name$.$service.name$'\n");
out->Print("\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintMethod(service->method(i), package, out);
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 3778f4fb88..e67b823697 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -85,19 +85,19 @@ static void lb_channel_op(grpc_channel_element *elem,
grpc_channel_op *op) {
lb_channel_data *chand = elem->channel_data;
grpc_channel_element *back;
- int calling_back = 0;
switch (op->dir) {
case GRPC_CALL_UP:
gpr_mu_lock(&chand->mu);
back = chand->back;
- if (back) {
- chand->calling_back++;
- calling_back = 1;
- }
+ if (back) chand->calling_back++;
gpr_mu_unlock(&chand->mu);
if (back) {
back->filter->channel_op(chand->back, elem, op);
+ gpr_mu_lock(&chand->mu);
+ chand->calling_back--;
+ gpr_cv_broadcast(&chand->cv);
+ gpr_mu_unlock(&chand->mu);
} else if (op->type == GRPC_TRANSPORT_GOAWAY) {
gpr_slice_unref(op->data.goaway.message);
}
@@ -107,27 +107,23 @@ static void lb_channel_op(grpc_channel_element *elem,
break;
}
- gpr_mu_lock(&chand->mu);
switch (op->type) {
case GRPC_TRANSPORT_CLOSED:
+ gpr_mu_lock(&chand->mu);
chand->disconnected = 1;
maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
+ gpr_mu_unlock(&chand->mu);
break;
case GRPC_CHANNEL_GOAWAY:
+ gpr_mu_lock(&chand->mu);
chand->sent_goaway = 1;
+ gpr_mu_unlock(&chand->mu);
break;
case GRPC_CHANNEL_DISCONNECT:
case GRPC_TRANSPORT_GOAWAY:
case GRPC_ACCEPT_CALL:
break;
}
-
- if (calling_back) {
- chand->calling_back--;
- gpr_cv_signal(&chand->cv);
- maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
- }
- gpr_mu_unlock(&chand->mu);
}
/* Constructor for call_data */
@@ -181,9 +177,7 @@ const grpc_channel_filter grpc_child_channel_top_filter = {
#define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0)
-static void finally_destroy_channel(void *c, int success) {
- /* ignore success or not... this is a destruction callback and will only
- happen once - the only purpose here is to release resources */
+static void finally_destroy_channel(void *c, grpc_iomgr_cb_status status) {
grpc_child_channel *channel = c;
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
/* wait for the initiator to leave the mutex */
@@ -193,7 +187,7 @@ static void finally_destroy_channel(void *c, int success) {
gpr_free(channel);
}
-static void send_farewells(void *c, int success) {
+static void send_farewells(void *c, grpc_iomgr_cb_status status) {
grpc_child_channel *channel = c;
grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
lb_channel_data *chand = lbelem->channel_data;
@@ -227,7 +221,7 @@ static void send_farewells(void *c, int success) {
static void maybe_destroy_channel(grpc_child_channel *channel) {
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
- !chand->sending_farewell && !chand->calling_back) {
+ !chand->sending_farewell) {
grpc_iomgr_add_callback(finally_destroy_channel, channel);
} else if (chand->destroyed && !chand->disconnected &&
chand->active_calls == 0 && !chand->sending_farewell &&
@@ -255,16 +249,14 @@ grpc_child_channel *grpc_child_channel_create(
return stk;
}
-void grpc_child_channel_destroy(grpc_child_channel *channel,
- int wait_for_callbacks) {
+void grpc_child_channel_destroy(grpc_child_channel *channel) {
grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
lb_channel_data *chand = lbelem->channel_data;
gpr_mu_lock(&chand->mu);
- while (wait_for_callbacks && chand->calling_back) {
+ while (chand->calling_back) {
gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future);
}
-
chand->back = NULL;
chand->destroyed = 1;
maybe_destroy_channel(channel);
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
index 3ba4c1b8a9..9fb2a17e29 100644
--- a/src/core/channel/child_channel.h
+++ b/src/core/channel/child_channel.h
@@ -53,8 +53,7 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel,
grpc_channel_op *op);
grpc_channel_element *grpc_child_channel_get_bottom_element(
grpc_child_channel *channel);
-void grpc_child_channel_destroy(grpc_child_channel *channel,
- int wait_for_callbacks);
+void grpc_child_channel_destroy(grpc_child_channel *channel);
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
grpc_call_element *parent);
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 46283835a0..fd883a08ca 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -294,6 +294,14 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
}
}
+static void finally_destroy_channel(void *arg, grpc_iomgr_cb_status status) {
+ grpc_child_channel_destroy(arg);
+}
+
+static void destroy_channel_later(grpc_child_channel *channel) {
+ grpc_iomgr_add_callback(finally_destroy_channel, channel);
+}
+
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
channel_data *chand = elem->channel_data;
@@ -309,7 +317,7 @@ static void channel_op(grpc_channel_element *elem,
gpr_mu_unlock(&chand->mu);
if (child_channel) {
grpc_child_channel_handle_op(child_channel, op);
- grpc_child_channel_destroy(child_channel, 1);
+ destroy_channel_later(child_channel);
} else {
gpr_slice_unref(op->data.goaway.message);
}
@@ -321,7 +329,7 @@ static void channel_op(grpc_channel_element *elem,
chand->active_child = NULL;
gpr_mu_unlock(&chand->mu);
if (child_channel) {
- grpc_child_channel_destroy(child_channel, 1);
+ destroy_channel_later(child_channel);
}
break;
case GRPC_TRANSPORT_GOAWAY:
@@ -336,7 +344,7 @@ static void channel_op(grpc_channel_element *elem,
}
gpr_mu_unlock(&chand->mu);
if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
+ destroy_channel_later(child_channel);
}
gpr_slice_unref(op->data.goaway.message);
break;
@@ -352,7 +360,7 @@ static void channel_op(grpc_channel_element *elem,
}
gpr_mu_unlock(&chand->mu);
if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
+ destroy_channel_later(child_channel);
}
break;
default:
@@ -437,7 +445,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
grpc_transport_setup_cancel(chand->transport_setup);
if (chand->active_child) {
- grpc_child_channel_destroy(chand->active_child, 1);
+ grpc_child_channel_destroy(chand->active_child);
chand->active_child = NULL;
}
@@ -541,7 +549,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
gpr_free(child_filters);
if (old_active) {
- grpc_child_channel_destroy(old_active, 1);
+ grpc_child_channel_destroy(old_active);
}
return result;
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index ebaf816902..b1194e278d 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -166,7 +166,8 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) {
return result;
}
-static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) {
+static void backoff_alarm_done(void *arg /* grpc_client_setup */,
+ grpc_iomgr_cb_status status) {
grpc_client_setup *s = arg;
grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
r->setup = s;
@@ -176,7 +177,7 @@ static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) {
gpr_mu_lock(&s->mu);
s->active_request = r;
s->in_alarm = 0;
- if (!success) {
+ if (status != GRPC_CALLBACK_SUCCESS) {
if (0 == --s->refs) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 2143eeb63d..06d73e40f5 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -101,11 +101,12 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
switch (status) {
case GRPC_ENDPOINT_CB_OK:
- grpc_endpoint_notify_on_read(req->ep, on_read, req);
+ grpc_endpoint_notify_on_read(req->ep, on_read, req, gpr_inf_future);
break;
case GRPC_ENDPOINT_CB_EOF:
case GRPC_ENDPOINT_CB_ERROR:
case GRPC_ENDPOINT_CB_SHUTDOWN:
+ case GRPC_ENDPOINT_CB_TIMED_OUT:
if (!req->have_read_byte) {
next_address(req);
} else {
@@ -122,7 +123,7 @@ done:
static void on_written(internal_request *req) {
gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
- grpc_endpoint_notify_on_read(req->ep, on_read, req);
+ grpc_endpoint_notify_on_read(req->ep, on_read, req, gpr_inf_future);
}
static void done_write(void *arg, grpc_endpoint_cb_status status) {
@@ -135,6 +136,7 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) {
case GRPC_ENDPOINT_CB_EOF:
case GRPC_ENDPOINT_CB_SHUTDOWN:
case GRPC_ENDPOINT_CB_ERROR:
+ case GRPC_ENDPOINT_CB_TIMED_OUT:
next_address(req);
break;
}
@@ -143,8 +145,8 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) {
static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text);
gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
- switch (
- grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
+ switch (grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req,
+ gpr_inf_future)) {
case GRPC_ENDPOINT_WRITE_DONE:
on_written(req);
break;
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 2664879323..b7238f716a 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -71,8 +71,8 @@ static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
-static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
- gpr_timespec *next, int success);
+static int run_some_expired_alarms(gpr_timespec now,
+ grpc_iomgr_cb_status status);
static gpr_timespec compute_min_deadline(shard_type *shard) {
return grpc_alarm_heap_is_empty(&shard->heap)
@@ -102,7 +102,7 @@ void grpc_alarm_list_init(gpr_timespec now) {
void grpc_alarm_list_shutdown() {
int i;
- while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0))
+ while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED))
;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@@ -233,7 +233,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm) {
gpr_mu_unlock(&shard->mu);
if (triggered) {
- alarm->cb(alarm->cb_arg, 0);
+ alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED);
}
}
@@ -299,8 +299,8 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now,
return n;
}
-static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
- gpr_timespec *next, int success) {
+static int run_some_expired_alarms(gpr_timespec now,
+ grpc_iomgr_cb_status status) {
size_t n = 0;
size_t i;
grpc_alarm *alarms[MAX_ALARMS_PER_CHECK];
@@ -329,35 +329,19 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
note_deadline_change(g_shard_queue[0]);
}
- if (next) {
- *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
- }
-
gpr_mu_unlock(&g_mu);
gpr_mu_unlock(&g_checker_mu);
- } else if (next) {
- gpr_mu_lock(&g_mu);
- *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
- gpr_mu_unlock(&g_mu);
- }
-
- if (n && drop_mu) {
- gpr_mu_unlock(drop_mu);
}
for (i = 0; i < n; i++) {
- alarms[i]->cb(alarms[i]->cb_arg, success);
- }
-
- if (n && drop_mu) {
- gpr_mu_lock(drop_mu);
+ alarms[i]->cb(alarms[i]->cb_arg, status);
}
return n;
}
-int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
- return run_some_expired_alarms(drop_mu, now, next, 1);
+int grpc_alarm_check(gpr_timespec now) {
+ return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS);
}
gpr_timespec grpc_alarm_list_next_timeout() {
diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h
index 12b6ab4286..e605ff84f9 100644
--- a/src/core/iomgr/alarm_internal.h
+++ b/src/core/iomgr/alarm_internal.h
@@ -34,12 +34,9 @@
#ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
#define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
-#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-
/* iomgr internal api for dealing with alarms */
-int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next);
+int grpc_alarm_check(gpr_timespec now);
void grpc_alarm_list_init(gpr_timespec now);
void grpc_alarm_list_shutdown();
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 9e5d56389d..f1944bf672 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -34,16 +34,14 @@
#include "src/core/iomgr/endpoint.h"
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data) {
- ep->vtable->notify_on_read(ep, cb, user_data);
+ void *user_data, gpr_timespec deadline) {
+ ep->vtable->notify_on_read(ep, cb, user_data, deadline);
}
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
- return ep->vtable->write(ep, slices, nslices, cb, user_data);
+grpc_endpoint_write_status grpc_endpoint_write(
+ grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) {
+ return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index ec86d9a146..bbd800bea8 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -48,7 +48,8 @@ typedef enum grpc_endpoint_cb_status {
GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */
GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */
GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */
- GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */
+ GRPC_ENDPOINT_CB_ERROR, /* Call interrupted by socket error */
+ GRPC_ENDPOINT_CB_TIMED_OUT /* Call timed out */
} grpc_endpoint_cb_status;
typedef enum grpc_endpoint_write_status {
@@ -65,10 +66,10 @@ typedef void (*grpc_endpoint_write_cb)(void *user_data,
struct grpc_endpoint_vtable {
void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data);
+ void *user_data, gpr_timespec deadline);
grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_endpoint_write_cb cb,
- void *user_data);
+ void *user_data, gpr_timespec deadline);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*shutdown)(grpc_endpoint *ep);
void (*destroy)(grpc_endpoint *ep);
@@ -76,7 +77,7 @@ struct grpc_endpoint_vtable {
/* When data is available on the connection, calls the callback with slices. */
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data);
+ void *user_data, gpr_timespec deadline);
/* Write slices out to the socket.
@@ -84,11 +85,9 @@ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
returns GRPC_ENDPOINT_WRITE_DONE.
Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
connection is ready for more data. */
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data);
+grpc_endpoint_write_status grpc_endpoint_write(
+ grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline);
/* Causes any pending read/write callbacks to run immediately with
GRPC_ENDPOINT_CB_SHUTDOWN status */
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
deleted file mode 100644
index 3cd2f9a8e0..0000000000
--- a/src/core/iomgr/fd_posix.c
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/iomgr/fd_posix.h"
-
-#include <assert.h>
-#include <unistd.h>
-
-#include "src/core/iomgr/iomgr_internal.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-enum descriptor_state { NOT_READY, READY, WAITING };
-
-static void destroy(grpc_fd *fd) {
- grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
- gpr_mu_destroy(&fd->set_state_mu);
- gpr_free(fd->watchers);
- gpr_free(fd);
- grpc_iomgr_unref();
-}
-
-static void ref_by(grpc_fd *fd, int n) {
- gpr_atm_no_barrier_fetch_add(&fd->refst, n);
-}
-
-static void unref_by(grpc_fd *fd, int n) {
- if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) {
- destroy(fd);
- }
-}
-
-static void do_nothing(void *ignored, int success) {}
-
-grpc_fd *grpc_fd_create(int fd) {
- grpc_fd *r = gpr_malloc(sizeof(grpc_fd));
- grpc_iomgr_ref();
- gpr_atm_rel_store(&r->refst, 1);
- gpr_atm_rel_store(&r->readst.state, NOT_READY);
- gpr_atm_rel_store(&r->writest.state, NOT_READY);
- gpr_mu_init(&r->set_state_mu);
- gpr_mu_init(&r->watcher_mu);
- gpr_atm_rel_store(&r->shutdown, 0);
- r->fd = fd;
- r->watchers = NULL;
- r->watcher_count = 0;
- r->watcher_capacity = 0;
- grpc_pollset_add_fd(grpc_backup_pollset(), r);
- return r;
-}
-
-int grpc_fd_is_orphaned(grpc_fd *fd) {
- return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
-}
-
-static void wake_watchers(grpc_fd *fd) {
- size_t i, n;
- gpr_mu_lock(&fd->watcher_mu);
- n = fd->watcher_count;
- for (i = 0; i < n; i++) {
- grpc_pollset_force_kick(fd->watchers[i]);
- }
- gpr_mu_unlock(&fd->watcher_mu);
-}
-
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
- fd->on_done = on_done ? on_done : do_nothing;
- fd->on_done_user_data = user_data;
- ref_by(fd, 1); /* remove active status, but keep referenced */
- wake_watchers(fd);
- close(fd->fd);
- unref_by(fd, 2); /* drop the reference */
-}
-
-/* increment refcount by two to avoid changing the orphan bit */
-void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
-
-void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
-
-typedef struct {
- grpc_iomgr_cb_func cb;
- void *arg;
-} callback;
-
-static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
- int allow_synchronous_callback) {
- if (allow_synchronous_callback) {
- cb(arg, success);
- } else {
- grpc_iomgr_add_delayed_callback(cb, arg, success);
- }
-}
-
-static void make_callbacks(callback *callbacks, size_t n, int success,
- int allow_synchronous_callback) {
- size_t i;
- for (i = 0; i < n; i++) {
- make_callback(callbacks[i].cb, callbacks[i].arg, success,
- allow_synchronous_callback);
- }
-}
-
-static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
- void *arg, int allow_synchronous_callback) {
- switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
- case NOT_READY:
- /* There is no race if the descriptor is already ready, so we skip
- the interlocked op in that case. As long as the app doesn't
- try to set the same upcall twice (which it shouldn't) then
- oldval should never be anything other than READY or NOT_READY. We
- don't
- check for user error on the fast path. */
- st->cb = cb;
- st->cb_arg = arg;
- if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) {
- /* swap was successful -- the closure will run after the next
- set_ready call. NOTE: we don't have an ABA problem here,
- since we should never have concurrent calls to the same
- notify_on function. */
- wake_watchers(fd);
- return;
- }
- /* swap was unsuccessful due to an intervening set_ready call.
- Fall through to the READY code below */
- case READY:
- assert(gpr_atm_acq_load(&st->state) == READY);
- gpr_atm_rel_store(&st->state, NOT_READY);
- make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback);
- return;
- case WAITING:
- /* upcallptr was set to a different closure. This is an error! */
- gpr_log(GPR_ERROR,
- "User called a notify_on function with a previous callback still "
- "pending");
- abort();
- }
- gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
- abort();
-}
-
-static void set_ready_locked(grpc_fd_state *st, callback *callbacks,
- size_t *ncallbacks) {
- callback *c;
-
- switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
- case NOT_READY:
- if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) {
- /* swap was successful -- the closure will run after the next
- notify_on call. */
- return;
- }
- /* swap was unsuccessful due to an intervening set_ready call.
- Fall through to the WAITING code below */
- case WAITING:
- assert(gpr_atm_acq_load(&st->state) == WAITING);
- c = &callbacks[(*ncallbacks)++];
- c->cb = st->cb;
- c->arg = st->cb_arg;
- gpr_atm_rel_store(&st->state, NOT_READY);
- return;
- case READY:
- /* duplicate ready, ignore */
- return;
- }
-}
-
-static void set_ready(grpc_fd *fd, grpc_fd_state *st,
- int allow_synchronous_callback) {
- /* only one set_ready can be active at once (but there may be a racing
- notify_on) */
- int success;
- callback cb;
- size_t ncb = 0;
- gpr_mu_lock(&fd->set_state_mu);
- set_ready_locked(st, &cb, &ncb);
- gpr_mu_unlock(&fd->set_state_mu);
- success = !gpr_atm_acq_load(&fd->shutdown);
- make_callbacks(&cb, ncb, success, allow_synchronous_callback);
-}
-
-void grpc_fd_shutdown(grpc_fd *fd) {
- callback cb[2];
- size_t ncb = 0;
- gpr_mu_lock(&fd->set_state_mu);
- GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
- gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(&fd->readst, cb, &ncb);
- set_ready_locked(&fd->writest, cb, &ncb);
- gpr_mu_unlock(&fd->set_state_mu);
- make_callbacks(cb, ncb, 0, 0);
-}
-
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
- void *read_cb_arg) {
- notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0);
-}
-
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
- void *write_cb_arg) {
- notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0);
-}
-
-gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask) {
- /* keep track of pollers that have requested our events, in case they change
- */
- gpr_mu_lock(&fd->watcher_mu);
- if (fd->watcher_capacity == fd->watcher_count) {
- fd->watcher_capacity =
- GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2);
- fd->watchers = gpr_realloc(fd->watchers,
- fd->watcher_capacity * sizeof(grpc_pollset *));
- }
- fd->watchers[fd->watcher_count++] = pollset;
- gpr_mu_unlock(&fd->watcher_mu);
-
- return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
- (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
-}
-
-void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) {
- size_t r, w, n;
-
- gpr_mu_lock(&fd->watcher_mu);
- n = fd->watcher_count;
- for (r = 0, w = 0; r < n; r++) {
- if (fd->watchers[r] == pollset) {
- fd->watcher_count--;
- continue;
- }
- fd->watchers[w++] = fd->watchers[r];
- }
- gpr_mu_unlock(&fd->watcher_mu);
-}
-
-void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
- set_ready(fd, &fd->readst, allow_synchronous_callback);
-}
-
-void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
- set_ready(fd, &fd->writest, allow_synchronous_callback);
-}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
deleted file mode 100644
index 232de0c3e0..0000000000
--- a/src/core/iomgr/fd_posix.h
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPC_INTERNAL_IOMGR_FD_POSIX_H_
-#define __GRPC_INTERNAL_IOMGR_FD_POSIX_H_
-
-#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/pollset.h"
-#include <grpc/support/atm.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-
-typedef struct {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
- int success;
- gpr_atm state;
-} grpc_fd_state;
-
-typedef struct grpc_fd {
- int fd;
- /* refst format:
- bit0: 1=active/0=orphaned
- bit1-n: refcount
- meaning that mostly we ref by two to avoid altering the orphaned bit,
- and just unref by 1 when we're ready to flag the object as orphaned */
- gpr_atm refst;
-
- gpr_mu set_state_mu;
- gpr_atm shutdown;
-
- gpr_mu watcher_mu;
- grpc_pollset **watchers;
- size_t watcher_count;
- size_t watcher_capacity;
-
- grpc_fd_state readst;
- grpc_fd_state writest;
-
- grpc_iomgr_cb_func on_done;
- void *on_done_user_data;
-} grpc_fd;
-
-/* Create a wrapped file descriptor.
- Requires fd is a non-blocking file descriptor.
- This takes ownership of closing fd. */
-grpc_fd *grpc_fd_create(int fd);
-
-/* Releases fd to be asynchronously destroyed.
- on_done is called when the underlying file descriptor is definitely close()d.
- If on_done is NULL, no callback will be made.
- Requires: *fd initialized; no outstanding notify_on_read or
- notify_on_write. */
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data);
-
-/* Begin polling on an fd.
- Registers that the given pollset is interested in this fd - so that if read
- or writability interest changes, the pollset can be kicked to pick up that
- new interest.
- Return value is:
- (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
- i.e. a combination of read_mask and write_mask determined by the fd's current
- interest in said events.
- Polling strategies that do not need to alter their behavior depending on the
- fd's current interest (such as epoll) do not need to call this function. */
-gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask);
-/* Complete polling previously started with grpc_fd_begin_poll */
-void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset);
-
-/* Return 1 if this fd is orphaned, 0 otherwise */
-int grpc_fd_is_orphaned(grpc_fd *fd);
-
-/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */
-void grpc_fd_shutdown(grpc_fd *fd);
-
-/* Register read interest, causing read_cb to be called once when fd becomes
- readable, on deadline specified by deadline, or on shutdown triggered by
- grpc_fd_shutdown.
- read_cb will be called with read_cb_arg when *fd becomes readable.
- read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable,
- GRPC_CALLBACK_TIMED_OUT if the call timed out,
- and CANCELLED if the call was cancelled.
-
- Requires:This method must not be called before the read_cb for any previous
- call runs. Edge triggered events are used whenever they are supported by the
- underlying platform. This means that users must drain fd in read_cb before
- calling notify_on_read again. Users are also expected to handle spurious
- events, i.e read_cb is called while nothing can be readable from fd */
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
- void *read_cb_arg);
-
-/* Exactly the same semantics as above, except based on writable events. */
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
- void *write_cb_arg);
-
-/* Notification from the poller to an fd that it has become readable or
- writable.
- If allow_synchronous_callback is 1, allow running the fd callback inline
- in this callstack, otherwise register an asynchronous callback and return */
-void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback);
-void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback);
-
-/* Reference counting for fds */
-void grpc_fd_ref(grpc_fd *fd);
-void grpc_fd_unref(grpc_fd *fd);
-
-#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
deleted file mode 100644
index 03f56a50a3..0000000000
--- a/src/core/iomgr/iomgr.c
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/iomgr/iomgr.h"
-
-#include <stdlib.h>
-
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/alarm_internal.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
-#include <grpc/support/sync.h>
-
-typedef struct delayed_callback {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
- int success;
- struct delayed_callback *next;
-} delayed_callback;
-
-static gpr_mu g_mu;
-static gpr_cv g_cv;
-static delayed_callback *g_cbs_head = NULL;
-static delayed_callback *g_cbs_tail = NULL;
-static int g_shutdown;
-static int g_refs;
-static gpr_event g_background_callback_executor_done;
-
-/* Execute followup callbacks continuously.
- Other threads may check in and help during pollset_work() */
-static void background_callback_executor(void *ignored) {
- gpr_mu_lock(&g_mu);
- while (!g_shutdown) {
- gpr_timespec deadline = gpr_inf_future;
- if (g_cbs_head) {
- delayed_callback *cb = g_cbs_head;
- g_cbs_head = cb->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, cb->success);
- gpr_free(cb);
- gpr_mu_lock(&g_mu);
- } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
- } else {
- gpr_cv_wait(&g_cv, &g_mu, deadline);
- }
- }
- gpr_mu_unlock(&g_mu);
- gpr_event_set(&g_background_callback_executor_done, (void *)1);
-}
-
-void grpc_kick_poller() { gpr_cv_broadcast(&g_cv); }
-
-void grpc_iomgr_init() {
- gpr_thd_id id;
- gpr_mu_init(&g_mu);
- gpr_cv_init(&g_cv);
- grpc_alarm_list_init(gpr_now());
- g_refs = 0;
- grpc_iomgr_platform_init();
- gpr_event_init(&g_background_callback_executor_done);
- gpr_thd_new(&id, background_callback_executor, NULL, NULL);
-}
-
-void grpc_iomgr_shutdown() {
- delayed_callback *cb;
- gpr_timespec shutdown_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
-
- grpc_iomgr_platform_shutdown();
-
- gpr_mu_lock(&g_mu);
- g_shutdown = 1;
- while (g_cbs_head || g_refs) {
- gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
- g_cbs_head ? " and executing final callbacks" : "");
- while (g_cbs_head) {
- cb = g_cbs_head;
- g_cbs_head = cb->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
-
- cb->cb(cb->cb_arg, 0);
- gpr_free(cb);
- gpr_mu_lock(&g_mu);
- }
- if (g_refs) {
- if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) {
- gpr_log(GPR_DEBUG,
- "Failed to free %d iomgr objects before shutdown deadline: "
- "memory leaks are likely",
- g_refs);
- break;
- }
- }
- }
- gpr_mu_unlock(&g_mu);
-
- gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future);
-
- grpc_alarm_list_shutdown();
- gpr_mu_destroy(&g_mu);
- gpr_cv_destroy(&g_cv);
-}
-
-void grpc_iomgr_ref() {
- gpr_mu_lock(&g_mu);
- ++g_refs;
- gpr_mu_unlock(&g_mu);
-}
-
-void grpc_iomgr_unref() {
- gpr_mu_lock(&g_mu);
- if (0 == --g_refs) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
-}
-
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success) {
- delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
- dcb->cb = cb;
- dcb->cb_arg = cb_arg;
- dcb->success = success;
- gpr_mu_lock(&g_mu);
- dcb->next = NULL;
- if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = dcb;
- } else {
- g_cbs_tail->next = dcb;
- g_cbs_tail = dcb;
- }
- gpr_cv_signal(&g_cv);
- gpr_mu_unlock(&g_mu);
-}
-
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
- grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);
-}
-
-int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
- int n = 0;
- gpr_mu *retake_mu = NULL;
- delayed_callback *cb;
- for (;;) {
- /* check for new work */
- if (!gpr_mu_trylock(&g_mu)) {
- break;
- }
- cb = g_cbs_head;
- if (!cb) {
- gpr_mu_unlock(&g_mu);
- break;
- }
- g_cbs_head = cb->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
- /* if we have a mutex to drop, do so before executing work */
- if (drop_mu) {
- gpr_mu_unlock(drop_mu);
- retake_mu = drop_mu;
- drop_mu = NULL;
- }
- cb->cb(cb->cb_arg, success && cb->success);
- gpr_free(cb);
- n++;
- }
- if (retake_mu) {
- gpr_mu_lock(retake_mu);
- }
- return n;
-}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 16991a9b90..cf39f947bc 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -34,8 +34,17 @@
#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__
#define __GRPC_INTERNAL_IOMGR_IOMGR_H__
+/* Status passed to callbacks for grpc_em_fd_notify_on_read and
+ grpc_em_fd_notify_on_write. */
+typedef enum grpc_em_cb_status {
+ GRPC_CALLBACK_SUCCESS = 0,
+ GRPC_CALLBACK_TIMED_OUT,
+ GRPC_CALLBACK_CANCELLED,
+ GRPC_CALLBACK_DO_NOT_USE
+} grpc_iomgr_cb_status;
+
/* gRPC Callback definition */
-typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
+typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status);
void grpc_iomgr_init();
void grpc_iomgr_shutdown();
diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_completion_queue_interface.h
index ca5af3e527..3c4efe773a 100644
--- a/src/core/iomgr/iomgr_posix.h
+++ b/src/core/iomgr/iomgr_completion_queue_interface.h
@@ -31,12 +31,15 @@
*
*/
-#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_
-#define __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_
+#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_
-#include "src/core/iomgr/iomgr_internal.h"
+/* Internals of iomgr that are exposed only to be used for completion queue
+ implementation */
-void grpc_pollset_global_init();
-void grpc_pollset_global_shutdown();
+extern gpr_mu grpc_iomgr_mu;
+extern gpr_cv grpc_iomgr_cv;
-#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ */
+int grpc_iomgr_work(gpr_timespec deadline);
+
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */
diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c
new file mode 100644
index 0000000000..6188ab2749
--- /dev/null
+++ b/src/core/iomgr/iomgr_libevent.c
@@ -0,0 +1,652 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/iomgr_libevent.h"
+
+#include <unistd.h>
+#include <fcntl.h>
+
+#include "src/core/iomgr/alarm.h"
+#include "src/core/iomgr/alarm_internal.h"
+#include <grpc/support/atm.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <event2/event.h>
+#include <event2/thread.h>
+
+#define ALARM_TRIGGER_INIT ((gpr_atm)0)
+#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1)
+#define DONE_SHUTDOWN ((void *)1)
+
+#define POLLER_ID_INVALID ((gpr_atm)-1)
+
+/* Global data */
+struct event_base *g_event_base;
+gpr_mu grpc_iomgr_mu;
+gpr_cv grpc_iomgr_cv;
+static grpc_libevent_activation_data *g_activation_queue;
+static int g_num_pollers;
+static int g_num_fds;
+static int g_num_address_resolutions;
+static gpr_timespec g_last_poll_completed;
+static int g_shutdown_backup_poller;
+static gpr_event g_backup_poller_done;
+/* activated to break out of the event loop early */
+static struct event *g_timeout_ev;
+/* activated to safely break polling from other threads */
+static struct event *g_break_ev;
+static grpc_fd *g_fds_to_free;
+
+int evthread_use_threads(void);
+static void grpc_fd_impl_destroy(grpc_fd *impl);
+
+void grpc_iomgr_ref_address_resolution(int delta) {
+ gpr_mu_lock(&grpc_iomgr_mu);
+ GPR_ASSERT(!g_shutdown_backup_poller);
+ g_num_address_resolutions += delta;
+ if (0 == g_num_address_resolutions) {
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ }
+ gpr_mu_unlock(&grpc_iomgr_mu);
+}
+
+/* If anything is in the work queue, process one item and return 1.
+ Return 0 if there were no work items to complete.
+ Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
+static int maybe_do_queue_work() {
+ grpc_libevent_activation_data *work = g_activation_queue;
+
+ if (work == NULL) return 0;
+
+ if (work->next == work) {
+ g_activation_queue = NULL;
+ } else {
+ g_activation_queue = work->next;
+ g_activation_queue->prev = work->prev;
+ g_activation_queue->next->prev = g_activation_queue->prev->next =
+ g_activation_queue;
+ }
+ work->next = work->prev = NULL;
+ /* force status to cancelled from ok when shutting down */
+ if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) {
+ work->status = GRPC_CALLBACK_CANCELLED;
+ }
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ work->cb(work->arg, work->status);
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+ return 1;
+}
+
+/* Break out of the event loop on timeout */
+static void timer_callback(int fd, short events, void *context) {
+ event_base_loopbreak((struct event_base *)context);
+}
+
+static void break_callback(int fd, short events, void *context) {
+ event_base_loopbreak((struct event_base *)context);
+}
+
+static void free_fd_list(grpc_fd *impl) {
+ while (impl != NULL) {
+ grpc_fd *current = impl;
+ impl = impl->next;
+ grpc_fd_impl_destroy(current);
+ current->on_done(current->on_done_user_data, GRPC_CALLBACK_SUCCESS);
+ gpr_free(current);
+ }
+}
+
+static void maybe_free_fds() {
+ if (g_fds_to_free) {
+ free_fd_list(g_fds_to_free);
+ g_fds_to_free = NULL;
+ }
+}
+
+void grpc_kick_poller() { event_active(g_break_ev, EV_READ, 0); }
+
+/* Spend some time doing polling and libevent maintenance work if no other
+ thread is. This includes both polling for events and destroying/closing file
+ descriptor objects.
+ Returns 1 if polling was performed, 0 otherwise.
+ Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
+static int maybe_do_polling_work(struct timeval delay) {
+ int status;
+
+ if (g_num_pollers) return 0;
+
+ g_num_pollers = 1;
+
+ maybe_free_fds();
+
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ event_add(g_timeout_ev, &delay);
+ status = event_base_loop(g_event_base, EVLOOP_ONCE);
+ if (status < 0) {
+ gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status);
+ }
+ event_del(g_timeout_ev);
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+ maybe_free_fds();
+
+ g_num_pollers = 0;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ return 1;
+}
+
+static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) {
+ int r = 0;
+ if (gpr_time_cmp(next, now) < 0) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
+ r = grpc_alarm_check(now);
+ gpr_mu_lock(&grpc_iomgr_mu);
+ }
+ return r;
+}
+
+int grpc_iomgr_work(gpr_timespec deadline) {
+ gpr_timespec now = gpr_now();
+ gpr_timespec next = grpc_alarm_list_next_timeout();
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, now);
+ /* poll for no longer than one second */
+ gpr_timespec max_delay = gpr_time_from_seconds(1);
+ struct timeval delay;
+
+ if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) {
+ return 0;
+ }
+
+ if (gpr_time_cmp(delay_timespec, max_delay) > 0) {
+ delay_timespec = max_delay;
+ }
+
+ /* Adjust delay to account for the next alarm, if applicable. */
+ delay_timespec = gpr_time_min(
+ delay_timespec, gpr_time_sub(grpc_alarm_list_next_timeout(), now));
+
+ delay = gpr_timeval_from_timespec(delay_timespec);
+
+ if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) ||
+ maybe_do_polling_work(delay)) {
+ g_last_poll_completed = gpr_now();
+ return 1;
+ }
+
+ return 0;
+}
+
+static void backup_poller_thread(void *p) {
+ int backup_poller_engaged = 0;
+ /* allow no pollers for 100 milliseconds, then engage backup polling */
+ gpr_timespec allow_no_pollers = gpr_time_from_millis(100);
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (!g_shutdown_backup_poller) {
+ if (g_num_pollers == 0) {
+ gpr_timespec now = gpr_now();
+ gpr_timespec time_until_engage = gpr_time_sub(
+ allow_no_pollers, gpr_time_sub(now, g_last_poll_completed));
+ if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) {
+ if (!backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller");
+ backup_poller_engaged = 1;
+ }
+ if (!maybe_do_queue_work()) {
+ gpr_timespec next = grpc_alarm_list_next_timeout();
+ if (!maybe_do_alarm_work(now, next)) {
+ gpr_timespec deadline =
+ gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1)));
+ maybe_do_polling_work(
+ gpr_timeval_from_timespec(gpr_time_sub(deadline, now)));
+ }
+ }
+ } else {
+ if (backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "Backup poller disengaged");
+ backup_poller_engaged = 0;
+ }
+ gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_sleep_until(gpr_time_add(now, time_until_engage));
+ gpr_mu_lock(&grpc_iomgr_mu);
+ }
+ } else {
+ if (backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "Backup poller disengaged");
+ backup_poller_engaged = 0;
+ }
+ gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future);
+ }
+ }
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ gpr_event_set(&g_backup_poller_done, (void *)1);
+}
+
+void grpc_iomgr_init() {
+ gpr_thd_id backup_poller_id;
+
+ if (evthread_use_threads() != 0) {
+ gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
+ abort();
+ }
+
+ grpc_alarm_list_init(gpr_now());
+
+ gpr_mu_init(&grpc_iomgr_mu);
+ gpr_cv_init(&grpc_iomgr_cv);
+ g_activation_queue = NULL;
+ g_num_pollers = 0;
+ g_num_fds = 0;
+ g_num_address_resolutions = 0;
+ g_last_poll_completed = gpr_now();
+ g_shutdown_backup_poller = 0;
+ g_fds_to_free = NULL;
+
+ gpr_event_init(&g_backup_poller_done);
+
+ g_event_base = NULL;
+ g_timeout_ev = NULL;
+ g_break_ev = NULL;
+
+ g_event_base = event_base_new();
+ if (!g_event_base) {
+ gpr_log(GPR_ERROR, "Failed to create the event base");
+ abort();
+ }
+
+ if (evthread_make_base_notifiable(g_event_base) != 0) {
+ gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!");
+ abort();
+ }
+
+ g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base);
+ g_break_ev = event_new(g_event_base, -1, EV_READ | EV_PERSIST, break_callback,
+ g_event_base);
+
+ event_add(g_break_ev, NULL);
+
+ gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL);
+}
+
+void grpc_iomgr_shutdown() {
+ gpr_timespec fd_shutdown_deadline =
+ gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
+
+ /* broadcast shutdown */
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (g_num_fds > 0 || g_num_address_resolutions > 0) {
+ gpr_log(GPR_INFO,
+ "waiting for %d fds and %d name resolutions to be destroyed before "
+ "closing event manager",
+ g_num_fds, g_num_address_resolutions);
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) {
+ gpr_log(GPR_ERROR,
+ "not all fds or name resolutions destroyed before shutdown "
+ "deadline: memory leaks "
+ "are likely");
+ break;
+ } else if (g_num_fds == 0 && g_num_address_resolutions == 0) {
+ gpr_log(GPR_INFO, "all fds closed, all name resolutions finished");
+ }
+ }
+
+ g_shutdown_backup_poller = 1;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
+
+ grpc_alarm_list_shutdown();
+
+ /* drain pending work */
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (maybe_do_queue_work())
+ ;
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ free_fd_list(g_fds_to_free);
+
+ /* complete shutdown */
+ gpr_mu_destroy(&grpc_iomgr_mu);
+ gpr_cv_destroy(&grpc_iomgr_cv);
+
+ if (g_timeout_ev != NULL) {
+ event_free(g_timeout_ev);
+ }
+
+ if (g_break_ev != NULL) {
+ event_free(g_break_ev);
+ }
+
+ if (g_event_base != NULL) {
+ event_base_free(g_event_base);
+ g_event_base = NULL;
+ }
+}
+
+static void add_task(grpc_libevent_activation_data *adata) {
+ gpr_mu_lock(&grpc_iomgr_mu);
+ if (g_activation_queue) {
+ adata->next = g_activation_queue;
+ adata->prev = adata->next->prev;
+ adata->next->prev = adata->prev->next = adata;
+ } else {
+ g_activation_queue = adata;
+ adata->next = adata->prev = adata;
+ }
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
+}
+
+static void grpc_fd_impl_destroy(grpc_fd *impl) {
+ grpc_em_task_activity_type type;
+ grpc_libevent_activation_data *adata;
+
+ for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) {
+ adata = &(impl->task.activation[type]);
+ GPR_ASSERT(adata->next == NULL);
+ if (adata->ev != NULL) {
+ event_free(adata->ev);
+ adata->ev = NULL;
+ }
+ }
+
+ if (impl->shutdown_ev != NULL) {
+ event_free(impl->shutdown_ev);
+ impl->shutdown_ev = NULL;
+ }
+ gpr_mu_destroy(&impl->mu);
+ close(impl->fd);
+}
+
+/* Proxy callback to call a gRPC read/write callback */
+static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) {
+ grpc_fd *em_fd = arg;
+ grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS;
+ int run_read_cb = 0;
+ int run_write_cb = 0;
+ grpc_libevent_activation_data *rdata, *wdata;
+
+ gpr_mu_lock(&em_fd->mu);
+ if (em_fd->shutdown_started) {
+ status = GRPC_CALLBACK_CANCELLED;
+ } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) {
+ status = GRPC_CALLBACK_TIMED_OUT;
+ /* TODO(klempner): This is broken if we are monitoring both read and write
+ events on the same fd -- generating a spurious event is okay, but
+ generating a spurious timeout is not. */
+ what |= (EV_READ | EV_WRITE);
+ }
+
+ if (what & EV_READ) {
+ switch (em_fd->read_state) {
+ case GRPC_FD_WAITING:
+ run_read_cb = 1;
+ em_fd->read_state = GRPC_FD_IDLE;
+ break;
+ case GRPC_FD_IDLE:
+ case GRPC_FD_CACHED:
+ em_fd->read_state = GRPC_FD_CACHED;
+ }
+ }
+ if (what & EV_WRITE) {
+ switch (em_fd->write_state) {
+ case GRPC_FD_WAITING:
+ run_write_cb = 1;
+ em_fd->write_state = GRPC_FD_IDLE;
+ break;
+ case GRPC_FD_IDLE:
+ case GRPC_FD_CACHED:
+ em_fd->write_state = GRPC_FD_CACHED;
+ }
+ }
+
+ if (run_read_cb) {
+ rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
+ rdata->status = status;
+ add_task(rdata);
+ } else if (run_write_cb) {
+ wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
+ wdata->status = status;
+ add_task(wdata);
+ }
+ gpr_mu_unlock(&em_fd->mu);
+}
+
+static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) {
+ /* TODO(klempner): This could just run directly in the calling thread, except
+ that libevent's handling of event_active() on an event which is already in
+ flight on a different thread is racy and easily triggers TSAN.
+ */
+ grpc_fd *impl = arg;
+ gpr_mu_lock(&impl->mu);
+ impl->shutdown_started = 1;
+ if (impl->read_state == GRPC_FD_WAITING) {
+ event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1);
+ }
+ if (impl->write_state == GRPC_FD_WAITING) {
+ event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1);
+ }
+ gpr_mu_unlock(&impl->mu);
+}
+
+grpc_fd *grpc_fd_create(int fd) {
+ int flags;
+ grpc_libevent_activation_data *rdata, *wdata;
+ grpc_fd *impl = gpr_malloc(sizeof(grpc_fd));
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+ g_num_fds++;
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ impl->shutdown_ev = NULL;
+ gpr_mu_init(&impl->mu);
+
+ flags = fcntl(fd, F_GETFL, 0);
+ GPR_ASSERT((flags & O_NONBLOCK) != 0);
+
+ impl->task.type = GRPC_EM_TASK_FD;
+ impl->fd = fd;
+
+ rdata = &(impl->task.activation[GRPC_EM_TA_READ]);
+ rdata->ev = NULL;
+ rdata->cb = NULL;
+ rdata->arg = NULL;
+ rdata->status = GRPC_CALLBACK_SUCCESS;
+ rdata->prev = NULL;
+ rdata->next = NULL;
+
+ wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]);
+ wdata->ev = NULL;
+ wdata->cb = NULL;
+ wdata->arg = NULL;
+ wdata->status = GRPC_CALLBACK_SUCCESS;
+ wdata->prev = NULL;
+ wdata->next = NULL;
+
+ impl->read_state = GRPC_FD_IDLE;
+ impl->write_state = GRPC_FD_IDLE;
+
+ impl->shutdown_started = 0;
+ impl->next = NULL;
+
+ /* TODO(chenw): detect platforms where only level trigger is supported,
+ and set the event to non-persist. */
+ rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ,
+ em_fd_cb, impl);
+ GPR_ASSERT(rdata->ev);
+
+ wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE,
+ em_fd_cb, impl);
+ GPR_ASSERT(wdata->ev);
+
+ impl->shutdown_ev =
+ event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl);
+ GPR_ASSERT(impl->shutdown_ev);
+
+ return impl;
+}
+
+static void do_nothing(void *ignored, grpc_iomgr_cb_status also_ignored) {}
+
+void grpc_fd_destroy(grpc_fd *impl, grpc_iomgr_cb_func on_done,
+ void *user_data) {
+ if (on_done == NULL) on_done = do_nothing;
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+
+ /* Put the impl on the list to be destroyed by the poller. */
+ impl->on_done = on_done;
+ impl->on_done_user_data = user_data;
+ impl->next = g_fds_to_free;
+ g_fds_to_free = impl;
+ /* TODO(ctiller): kick the poller so it destroys this fd promptly
+ (currently we may wait up to a second) */
+
+ g_num_fds--;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
+}
+
+int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; }
+
+/* TODO(chenw): should we enforce the contract that notify_on_read cannot be
+ called when the previously registered callback has not been called yet. */
+int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb,
+ void *read_cb_arg, gpr_timespec deadline) {
+ int force_event = 0;
+ grpc_libevent_activation_data *rdata;
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
+ struct timeval *delayp =
+ gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
+
+ rdata = &impl->task.activation[GRPC_EM_TA_READ];
+
+ gpr_mu_lock(&impl->mu);
+ rdata->cb = read_cb;
+ rdata->arg = read_cb_arg;
+
+ force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED);
+ impl->read_state = GRPC_FD_WAITING;
+
+ if (force_event) {
+ event_active(rdata->ev, EV_READ, 1);
+ } else if (event_add(rdata->ev, delayp) == -1) {
+ gpr_mu_unlock(&impl->mu);
+ return 0;
+ }
+ gpr_mu_unlock(&impl->mu);
+ return 1;
+}
+
+int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb,
+ void *write_cb_arg, gpr_timespec deadline) {
+ int force_event = 0;
+ grpc_libevent_activation_data *wdata;
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
+ struct timeval *delayp =
+ gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
+
+ wdata = &impl->task.activation[GRPC_EM_TA_WRITE];
+
+ gpr_mu_lock(&impl->mu);
+ wdata->cb = write_cb;
+ wdata->arg = write_cb_arg;
+
+ force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED);
+ impl->write_state = GRPC_FD_WAITING;
+
+ if (force_event) {
+ event_active(wdata->ev, EV_WRITE, 1);
+ } else if (event_add(wdata->ev, delayp) == -1) {
+ gpr_mu_unlock(&impl->mu);
+ return 0;
+ }
+ gpr_mu_unlock(&impl->mu);
+ return 1;
+}
+
+void grpc_fd_shutdown(grpc_fd *em_fd) {
+ event_active(em_fd->shutdown_ev, EV_READ, 1);
+}
+
+/* Sometimes we want a followup callback: something to be added from the
+ current callback for the EM to invoke once this callback is complete.
+ This is implemented by inserting an entry into an EM queue. */
+
+/* The following structure holds the field needed for adding the
+ followup callback. These are the argument for the followup callback,
+ the function to use for the followup callback, and the
+ activation data pointer used for the queues (to free in the CB) */
+struct followup_callback_arg {
+ grpc_iomgr_cb_func func;
+ void *cb_arg;
+ grpc_libevent_activation_data adata;
+};
+
+static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) {
+ struct followup_callback_arg *fcb_arg = cb_arg;
+ /* Invoke the function */
+ fcb_arg->func(fcb_arg->cb_arg, status);
+ gpr_free(fcb_arg);
+}
+
+void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
+ grpc_libevent_activation_data *adptr;
+ struct followup_callback_arg *fcb_arg;
+
+ fcb_arg = gpr_malloc(sizeof(*fcb_arg));
+ /* Set up the activation data and followup callback argument structures */
+ adptr = &fcb_arg->adata;
+ adptr->ev = NULL;
+ adptr->cb = followup_proxy_callback;
+ adptr->arg = fcb_arg;
+ adptr->status = GRPC_CALLBACK_SUCCESS;
+ adptr->prev = NULL;
+ adptr->next = NULL;
+
+ fcb_arg->func = cb;
+ fcb_arg->cb_arg = cb_arg;
+
+ /* Insert an activation data for the specified em */
+ add_task(adptr);
+}
diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h
new file mode 100644
index 0000000000..5c088006a0
--- /dev/null
+++ b/src/core/iomgr/iomgr_libevent.h
@@ -0,0 +1,206 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__
+#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__
+
+#include "src/core/iomgr/iomgr.h"
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+typedef struct grpc_fd grpc_fd;
+
+/* gRPC event manager task "base class". This is pretend-inheritance in C89.
+ This should be the first member of any actual grpc_em task type.
+
+ Memory warning: expanding this will increase memory usage in any derived
+ class, so be careful.
+
+ For generality, this base can be on multiple task queues and can have
+ multiple event callbacks registered. Not all "derived classes" will use
+ this feature. */
+
+typedef enum grpc_libevent_task_type {
+ GRPC_EM_TASK_ALARM,
+ GRPC_EM_TASK_FD,
+ GRPC_EM_TASK_DO_NOT_USE
+} grpc_libevent_task_type;
+
+/* Different activity types to shape the callback and queueing arrays */
+typedef enum grpc_em_task_activity_type {
+ GRPC_EM_TA_READ, /* use this also for single-type events */
+ GRPC_EM_TA_WRITE,
+ GRPC_EM_TA_COUNT
+} grpc_em_task_activity_type;
+
+/* Include the following #define for convenience for tasks like alarms that
+ only have a single type */
+#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ
+
+typedef struct grpc_libevent_activation_data {
+ struct event *ev; /* event activated on this callback type */
+ grpc_iomgr_cb_func cb; /* function pointer for callback */
+ void *arg; /* argument passed to cb */
+
+ /* Hold the status associated with the callback when queued */
+ grpc_iomgr_cb_status status;
+ /* Now set up to link activations into scheduler queues */
+ struct grpc_libevent_activation_data *prev;
+ struct grpc_libevent_activation_data *next;
+} grpc_libevent_activation_data;
+
+typedef struct grpc_libevent_task {
+ grpc_libevent_task_type type;
+
+ /* Now have an array of activation data elements: one for each activity
+ type that could get activated */
+ grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT];
+} grpc_libevent_task;
+
+/* Initialize *em_fd.
+ Requires fd is a non-blocking file descriptor.
+
+ This takes ownership of closing fd.
+
+ Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */
+grpc_fd *grpc_fd_create(int fd);
+
+/* Cause *em_fd no longer to be initialized and closes the underlying fd.
+ on_done is called when the underlying file descriptor is definitely close()d.
+ If on_done is NULL, no callback will be made.
+ Requires: *em_fd initialized; no outstanding notify_on_read or
+ notify_on_write. */
+void grpc_fd_destroy(grpc_fd *em_fd, grpc_iomgr_cb_func on_done,
+ void *user_data);
+
+/* Returns the file descriptor associated with *em_fd. */
+int grpc_fd_get(grpc_fd *em_fd);
+
+/* Register read interest, causing read_cb to be called once when em_fd becomes
+ readable, on deadline specified by deadline, or on shutdown triggered by
+ grpc_fd_shutdown.
+ read_cb will be called with read_cb_arg when *em_fd becomes readable.
+ read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable,
+ GRPC_CALLBACK_TIMED_OUT if the call timed out,
+ and CANCELLED if the call was cancelled.
+
+ Requires:This method must not be called before the read_cb for any previous
+ call runs. Edge triggered events are used whenever they are supported by the
+ underlying platform. This means that users must drain em_fd in read_cb before
+ calling notify_on_read again. Users are also expected to handle spurious
+ events, i.e read_cb is called while nothing can be readable from em_fd */
+int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb,
+ void *read_cb_arg, gpr_timespec deadline);
+
+/* Exactly the same semantics as above, except based on writable events. */
+int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
+ void *write_cb_arg, gpr_timespec deadline);
+
+/* Cause any current and all future read/write callbacks to error out with
+ GRPC_CALLBACK_CANCELLED. */
+void grpc_fd_shutdown(grpc_fd *em_fd);
+
+/* =================== Event caching ===================
+ In order to not miss or double-return edges in the context of edge triggering
+ and multithreading, we need a per-fd caching layer in the eventmanager itself
+ to cache relevant events.
+
+ There are two types of events we care about: calls to notify_on_[read|write]
+ and readable/writable events for the socket from eventfd. There are separate
+ event caches for read and write.
+
+ There are three states:
+ 0. "waiting" -- There's been a call to notify_on_[read|write] which has not
+ had a corresponding event. In other words, we're waiting for an event so we
+ can run the callback.
+ 1. "idle" -- We are neither waiting nor have a cached event.
+ 2. "cached" -- There has been a read/write event without a waiting callback,
+ so we want to run the event next time the application calls
+ notify_on_[read|write].
+
+ The high level state diagram:
+
+ +--------------------------------------------------------------------+
+ | WAITING | IDLE | CACHED |
+ | | | |
+ | 1. --*-> 2. --+-> 3. --+\
+ | | | <--+/
+ | | | |
+ x+-- 6. 5. <-+-- 4. <-*-- |
+ | | | |
+ +--------------------------------------------------------------------+
+
+ Transitions right occur on read|write events. Transitions left occur on
+ notify_on_[read|write] events.
+ State transitions:
+ 1. Read|Write event while waiting -> run the callback and transition to idle.
+ 2. Read|Write event while idle -> transition to cached.
+ 3. Read|Write event with one already cached -> still cached.
+ 4. notify_on_[read|write] with event cached: run callback and transition to
+ idle.
+ 5. notify_on_[read|write] when idle: Store callback and transition to
+ waiting.
+ 6. notify_on_[read|write] when waiting: invalid. */
+
+typedef enum grpc_fd_state {
+ GRPC_FD_WAITING = 0,
+ GRPC_FD_IDLE = 1,
+ GRPC_FD_CACHED = 2
+} grpc_fd_state;
+
+/* gRPC file descriptor handle.
+ The handle is used to register read/write callbacks to a file descriptor */
+struct grpc_fd {
+ grpc_libevent_task task; /* Base class, callbacks, queues, etc */
+ int fd; /* File descriptor */
+
+ /* Note that the shutdown event is only needed as a workaround for libevent
+ not properly handling event_active on an in flight event. */
+ struct event *shutdown_ev; /* activated to trigger shutdown */
+
+ /* protect shutdown_started|read_state|write_state and ensure barriers
+ between notify_on_[read|write] and read|write callbacks */
+ gpr_mu mu;
+ int shutdown_started; /* 0 -> shutdown not started, 1 -> started */
+ grpc_fd_state read_state;
+ grpc_fd_state write_state;
+
+ /* descriptor delete list. These are destroyed during polling. */
+ struct grpc_fd *next;
+ grpc_iomgr_cb_func on_done;
+ void *on_done_user_data;
+};
+
+void grpc_iomgr_ref_address_resolution(int delta);
+
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_libevent_use_threads.c
index 5f72542777..af449342f0 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_libevent_use_threads.c
@@ -31,21 +31,26 @@
*
*/
-#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_
-#define __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_
-
-#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/iomgr_internal.h"
+/* Posix grpc event manager support code. */
+#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include <event2/thread.h>
-int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success);
-
-void grpc_iomgr_ref();
-void grpc_iomgr_unref();
-
-void grpc_iomgr_platform_init();
-void grpc_iomgr_platform_shutdown();
+static int error_code = 0;
+static gpr_once threads_once = GPR_ONCE_INIT;
+static void evthread_threads_initialize(void) {
+ error_code = evthread_use_pthreads();
+ if (error_code) {
+ gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
+ }
+}
-#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ */
+/* Notify LibEvent that Posix pthread is used. */
+int evthread_use_threads() {
+ gpr_once_init(&threads_once, &evthread_threads_initialize);
+ /* For Pthreads or Windows threads, Libevent provides simple APIs to set
+ mutexes and conditional variables to support cross thread operations.
+ For other platforms, LibEvent provide callback APIs to hook mutexes and
+ conditional variables. */
+ return error_code;
+}
diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/pollset.c
index ff9195ec1d..62a0019eb3 100644
--- a/src/core/iomgr/iomgr_posix.c
+++ b/src/core/iomgr/pollset.c
@@ -31,8 +31,7 @@
*
*/
-#include "src/core/iomgr/iomgr_posix.h"
+#include "src/core/iomgr/pollset.h"
-void grpc_iomgr_platform_init() { grpc_pollset_global_init(); }
-
-void grpc_iomgr_platform_shutdown() { grpc_pollset_global_shutdown(); }
+void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; }
+void grpc_pollset_destroy(grpc_pollset *pollset) {}
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 7374a4ec13..ba1a9d5429 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -34,31 +34,18 @@
#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_
#define __GRPC_INTERNAL_IOMGR_POLLSET_H_
-#include <grpc/support/port_platform.h>
-
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
- a server will typically keep a pollset containing all connected channels,
so that it can find new calls to service
- a completion queue might keep a pollset with an entry for each transport
that is servicing a call that it's tracking */
-
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_posix.h"
-#endif
+/* Eventually different implementations of iomgr will provide their own
+ grpc_pollset structs. As this is just a dummy wrapper to get the API in,
+ we just define a simple type here. */
+typedef struct { char unused; } grpc_pollset;
void grpc_pollset_init(grpc_pollset *pollset);
void grpc_pollset_destroy(grpc_pollset *pollset);
-/* Do some work on a pollset.
- May involve invoking asynchronous callbacks, or actually polling file
- descriptors.
- Requires GRPC_POLLSET_MU(pollset) locked.
- May unlock GRPC_POLLSET_MU(pollset) during its execution. */
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
-
-/* Break a pollset out of polling work
- Requires GRPC_POLLSET_MU(pollset) locked. */
-void grpc_pollset_kick(grpc_pollset *pollset);
-
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
deleted file mode 100644
index e482da94f7..0000000000
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL
-
-#include "src/core/iomgr/pollset_posix.h"
-
-#include <errno.h>
-#include <poll.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-typedef struct {
- /* all polled fds */
- size_t fd_count;
- size_t fd_capacity;
- grpc_fd **fds;
- /* fds being polled by the current poller: parallel arrays of pollfd and the
- * grpc_fd* that the pollfd was constructed from */
- size_t pfd_count;
- size_t pfd_capacity;
- grpc_fd **selfds;
- struct pollfd *pfds;
- /* fds that have been removed from the pollset explicitly */
- size_t del_count;
- size_t del_capacity;
- grpc_fd **dels;
-} pollset_hdr;
-
-static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset,
- grpc_fd *fd) {
- size_t i;
- pollset_hdr *h = pollset->data.ptr;
- /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
- for (i = 0; i < h->fd_count; i++) {
- if (h->fds[i] == fd) return;
- }
- if (h->fd_count == h->fd_capacity) {
- h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
- h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity);
- }
- h->fds[h->fd_count++] = fd;
- grpc_fd_ref(fd);
-}
-
-static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
- grpc_fd *fd) {
- /* will get removed next poll cycle */
- pollset_hdr *h = pollset->data.ptr;
- if (h->del_count == h->del_capacity) {
- h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2);
- h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity);
- }
- h->dels[h->del_count++] = fd;
- grpc_fd_ref(fd);
-}
-
-static void end_polling(grpc_pollset *pollset) {
- size_t i;
- pollset_hdr *h;
- h = pollset->data.ptr;
- for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(h->selfds[i], pollset);
- }
-}
-
-static int multipoll_with_poll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
- int timeout;
- int r;
- size_t i, np, nf, nd;
- pollset_hdr *h;
-
- if (pollset->counter) {
- return 0;
- }
- h = pollset->data.ptr;
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout = -1;
- } else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
- return 1;
- }
- }
- if (h->pfd_capacity < h->fd_count + 1) {
- h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
- gpr_free(h->pfds);
- gpr_free(h->selfds);
- h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
- h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity);
- }
- nf = 0;
- np = 1;
- h->pfds[0].fd = grpc_kick_read_fd(pollset);
- h->pfds[0].events = POLLIN;
- h->pfds[0].revents = POLLOUT;
- for (i = 0; i < h->fd_count; i++) {
- int remove = grpc_fd_is_orphaned(h->fds[i]);
- for (nd = 0; nd < h->del_count; nd++) {
- if (h->fds[i] == h->dels[nd]) remove = 1;
- }
- if (remove) {
- grpc_fd_unref(h->fds[i]);
- } else {
- h->fds[nf++] = h->fds[i];
- h->pfds[np].events =
- grpc_fd_begin_poll(h->fds[i], pollset, POLLIN, POLLOUT);
- h->selfds[np] = h->fds[i];
- h->pfds[np].fd = h->fds[i]->fd;
- h->pfds[np].revents = 0;
- np++;
- }
- }
- h->pfd_count = np;
- h->fd_count = nf;
- for (nd = 0; nd < h->del_count; nd++) {
- grpc_fd_unref(h->dels[nd]);
- }
- h->del_count = 0;
- if (h->pfd_count == 0) {
- end_polling(pollset);
- return 0;
- }
- pollset->counter = 1;
- gpr_mu_unlock(&pollset->mu);
-
- r = poll(h->pfds, h->pfd_count, timeout);
- if (r < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
- }
- } else if (r == 0) {
- /* do nothing */
- } else {
- if (h->pfds[0].revents & POLLIN) {
- grpc_kick_drain(pollset);
- }
- for (i = 1; i < np; i++) {
- if (h->pfds[i].revents & POLLIN) {
- grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback);
- }
- if (h->pfds[i].revents & POLLOUT) {
- grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback);
- }
- }
- }
- end_polling(pollset);
-
- gpr_mu_lock(&pollset->mu);
- pollset->counter = 0;
- gpr_cv_broadcast(&pollset->cv);
- return 1;
-}
-
-static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
- size_t i;
- pollset_hdr *h = pollset->data.ptr;
- GPR_ASSERT(pollset->counter == 0);
- for (i = 0; i < h->fd_count; i++) {
- grpc_fd_unref(h->fds[i]);
- }
- for (i = 0; i < h->del_count; i++) {
- grpc_fd_unref(h->dels[i]);
- }
- gpr_free(h->pfds);
- gpr_free(h->selfds);
- gpr_free(h->fds);
- gpr_free(h->dels);
- gpr_free(h);
-}
-
-static const grpc_pollset_vtable multipoll_with_poll_pollset = {
- multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
- multipoll_with_poll_pollset_maybe_work,
- multipoll_with_poll_pollset_destroy};
-
-void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
- size_t nfds) {
- size_t i;
- pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
- pollset->vtable = &multipoll_with_poll_pollset;
- pollset->data.ptr = h;
- h->fd_count = nfds;
- h->fd_capacity = nfds;
- h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
- h->pfd_count = 0;
- h->pfd_capacity = 0;
- h->pfds = NULL;
- h->selfds = NULL;
- h->del_count = 0;
- h->del_capacity = 0;
- h->dels = NULL;
- for (i = 0; i < nfds; i++) {
- h->fds[i] = fds[i];
- grpc_fd_ref(fds[i]);
- }
-}
-
-#endif
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
deleted file mode 100644
index ff00e06429..0000000000
--- a/src/core/iomgr/pollset_posix.c
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/iomgr/pollset_posix.h"
-
-#include <errno.h>
-#include <poll.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "src/core/iomgr/alarm_internal.h"
-#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/socket_utils_posix.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
-#include <grpc/support/useful.h>
-
-/* kick pipes: we keep a sharded set of pipes to allow breaking from poll.
- Ideally this would be 1:1 with pollsets, but we'd like to avoid associating
- full kernel objects with each pollset to keep them lightweight, so instead
- keep a sharded set and allow associating a pollset with one of the shards.
-
- TODO(ctiller): move this out from this file, and allow an eventfd
- implementation on linux */
-
-#define LOG2_KICK_SHARDS 6
-#define KICK_SHARDS (1 << LOG2_KICK_SHARDS)
-
-static int g_kick_pipes[KICK_SHARDS][2];
-static grpc_pollset g_backup_pollset;
-static int g_shutdown_backup_poller;
-static gpr_event g_backup_poller_done;
-
-static void backup_poller(void *p) {
- gpr_timespec delta = gpr_time_from_millis(100);
- gpr_timespec last_poll = gpr_now();
-
- gpr_mu_lock(&g_backup_pollset.mu);
- while (g_shutdown_backup_poller == 0) {
- gpr_timespec next_poll = gpr_time_add(last_poll, delta);
- grpc_pollset_work(&g_backup_pollset, next_poll);
- gpr_mu_unlock(&g_backup_pollset.mu);
- gpr_sleep_until(next_poll);
- gpr_mu_lock(&g_backup_pollset.mu);
- last_poll = next_poll;
- }
- gpr_mu_unlock(&g_backup_pollset.mu);
-
- gpr_event_set(&g_backup_poller_done, (void *)1);
-}
-
-static size_t kick_shard(const grpc_pollset *info) {
- size_t x = (size_t)info;
- return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1);
-}
-
-int grpc_kick_read_fd(grpc_pollset *p) {
- return g_kick_pipes[kick_shard(p)][0];
-}
-
-static int grpc_kick_write_fd(grpc_pollset *p) {
- return g_kick_pipes[kick_shard(p)][1];
-}
-
-void grpc_pollset_force_kick(grpc_pollset *p) {
- char c = 0;
- while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR)
- ;
-}
-
-void grpc_pollset_kick(grpc_pollset *p) {
- if (!p->counter) return;
- grpc_pollset_force_kick(p);
-}
-
-void grpc_kick_drain(grpc_pollset *p) {
- int fd = grpc_kick_read_fd(p);
- char buf[128];
- int r;
-
- for (;;) {
- r = read(fd, buf, sizeof(buf));
- if (r > 0) continue;
- if (r == 0) return;
- switch (errno) {
- case EAGAIN:
- return;
- case EINTR:
- continue;
- default:
- gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
- return;
- }
- }
-}
-
-/* global state management */
-
-grpc_pollset *grpc_backup_pollset() { return &g_backup_pollset; }
-
-void grpc_pollset_global_init() {
- int i;
- gpr_thd_id id;
-
- /* initialize the kick shards */
- for (i = 0; i < KICK_SHARDS; i++) {
- GPR_ASSERT(0 == pipe(g_kick_pipes[i]));
- GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1));
- GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1));
- }
-
- /* initialize the backup pollset */
- grpc_pollset_init(&g_backup_pollset);
-
- /* start the backup poller thread */
- g_shutdown_backup_poller = 0;
- gpr_event_init(&g_backup_poller_done);
- gpr_thd_new(&id, backup_poller, NULL, NULL);
-}
-
-void grpc_pollset_global_shutdown() {
- int i;
-
- /* terminate the backup poller thread */
- gpr_mu_lock(&g_backup_pollset.mu);
- g_shutdown_backup_poller = 1;
- gpr_mu_unlock(&g_backup_pollset.mu);
- gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
-
- /* destroy the backup pollset */
- grpc_pollset_destroy(&g_backup_pollset);
-
- /* destroy the kick shards */
- for (i = 0; i < KICK_SHARDS; i++) {
- close(g_kick_pipes[i][0]);
- close(g_kick_pipes[i][1]);
- }
-}
-
-/* main interface */
-
-static void become_empty_pollset(grpc_pollset *pollset);
-static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd);
-
-void grpc_pollset_init(grpc_pollset *pollset) {
- gpr_mu_init(&pollset->mu);
- gpr_cv_init(&pollset->cv);
- become_empty_pollset(pollset);
-}
-
-void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
- gpr_mu_lock(&pollset->mu);
- pollset->vtable->add_fd(pollset, fd);
- gpr_cv_broadcast(&pollset->cv);
- gpr_mu_unlock(&pollset->mu);
-}
-
-void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
- gpr_mu_lock(&pollset->mu);
- pollset->vtable->del_fd(pollset, fd);
- gpr_cv_broadcast(&pollset->cv);
- gpr_mu_unlock(&pollset->mu);
-}
-
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
- /* pollset->mu already held */
- gpr_timespec now;
- now = gpr_now();
- if (gpr_time_cmp(now, deadline) > 0) {
- return 0;
- }
- if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
- return 1;
- }
- if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
- return 1;
- }
- return pollset->vtable->maybe_work(pollset, deadline, now, 1);
-}
-
-void grpc_pollset_destroy(grpc_pollset *pollset) {
- pollset->vtable->destroy(pollset);
- gpr_mu_destroy(&pollset->mu);
- gpr_cv_destroy(&pollset->cv);
-}
-
-/*
- * empty_pollset - a vtable that provides polling for NO file descriptors
- */
-
-static void empty_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
- become_unary_pollset(pollset, fd);
-}
-
-static void empty_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {}
-
-static int empty_pollset_maybe_work(grpc_pollset *pollset,
- gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
- return 0;
-}
-
-static void empty_pollset_destroy(grpc_pollset *pollset) {}
-
-static const grpc_pollset_vtable empty_pollset = {
- empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work,
- empty_pollset_destroy};
-
-static void become_empty_pollset(grpc_pollset *pollset) {
- pollset->vtable = &empty_pollset;
-}
-
-/*
- * unary_poll_pollset - a vtable that provides polling for one file descriptor
- * via poll()
- */
-
-static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
- grpc_fd *fds[2];
- if (fd == pollset->data.ptr) return;
- fds[0] = pollset->data.ptr;
- fds[1] = fd;
- grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
- grpc_fd_unref(fds[0]);
-}
-
-static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
- if (fd == pollset->data.ptr) {
- grpc_fd_unref(pollset->data.ptr);
- become_empty_pollset(pollset);
- }
-}
-
-static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
- gpr_timespec deadline,
- gpr_timespec now,
- int allow_synchronous_callback) {
- struct pollfd pfd[2];
- grpc_fd *fd;
- int timeout;
- int r;
-
- if (pollset->counter) {
- return 0;
- }
- fd = pollset->data.ptr;
- if (grpc_fd_is_orphaned(fd)) {
- grpc_fd_unref(fd);
- become_empty_pollset(pollset);
- return 0;
- }
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout = -1;
- } else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
- return 1;
- }
- }
- pfd[0].fd = grpc_kick_read_fd(pollset);
- pfd[0].events = POLLIN;
- pfd[0].revents = 0;
- pfd[1].fd = fd->fd;
- pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT);
- pfd[1].revents = 0;
- pollset->counter = 1;
- gpr_mu_unlock(&pollset->mu);
-
- r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout);
- if (r < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
- }
- } else if (r == 0) {
- /* do nothing */
- } else {
- if (pfd[0].revents & POLLIN) {
- grpc_kick_drain(pollset);
- }
- if (pfd[1].revents & POLLIN) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
- }
- if (pfd[1].revents & POLLOUT) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
- }
- }
-
- gpr_mu_lock(&pollset->mu);
- grpc_fd_end_poll(fd, pollset);
- pollset->counter = 0;
- gpr_cv_broadcast(&pollset->cv);
- return 1;
-}
-
-static void unary_poll_pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->counter == 0);
- grpc_fd_unref(pollset->data.ptr);
-}
-
-static const grpc_pollset_vtable unary_poll_pollset = {
- unary_poll_pollset_add_fd, unary_poll_pollset_del_fd,
- unary_poll_pollset_maybe_work, unary_poll_pollset_destroy};
-
-static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) {
- pollset->vtable = &unary_poll_pollset;
- pollset->counter = 0;
- pollset->data.ptr = fd;
- grpc_fd_ref(fd);
-}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
deleted file mode 100644
index f051079f5b..0000000000
--- a/src/core/iomgr/pollset_posix.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_
-#define __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_
-
-#include <grpc/support/sync.h>
-
-typedef struct grpc_pollset_vtable grpc_pollset_vtable;
-
-/* forward declare only in this file to avoid leaking impl details via
- pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not
- use the struct tag */
-struct grpc_fd;
-
-typedef struct grpc_pollset {
- /* pollsets under posix can mutate representation as fds are added and
- removed.
- For example, we may choose a poll() based implementation on linux for
- few fds, and an epoll() based implementation for many fds */
- const grpc_pollset_vtable *vtable;
- gpr_mu mu;
- gpr_cv cv;
- int counter;
- union {
- int fd;
- void *ptr;
- } data;
-} grpc_pollset;
-
-struct grpc_pollset_vtable {
- void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
- void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
- int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback);
- void (*destroy)(grpc_pollset *pollset);
-};
-
-#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
-#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv)
-
-/* Add an fd to a pollset */
-void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
-/* Force remove an fd from a pollset (normally they are removed on the next
- poll after an fd is orphaned) */
-void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd);
-
-/* Force any current pollers to break polling */
-void grpc_pollset_force_kick(grpc_pollset *pollset);
-/* Returns the fd to listen on for kicks */
-int grpc_kick_read_fd(grpc_pollset *p);
-/* Call after polling has been kicked to leave the kicked state */
-void grpc_kick_drain(grpc_pollset *p);
-
-/* All fds get added to a backup pollset to ensure that progress is made
- regardless of applications listening to events. Relying on this is slow
- however (the backup pollset only listens every 100ms or so) - so it's not
- to be relied on. */
-grpc_pollset *grpc_backup_pollset();
-
-/* turn a pollset into a multipoller: platform specific */
-void grpc_platform_become_multipoller(grpc_pollset *pollset,
- struct grpc_fd **fds, size_t fd_count);
-
-#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ */
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index c9c2c5378a..a0a04297eb 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -41,7 +41,7 @@
#include <unistd.h>
#include <string.h>
-#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/alloc.h>
@@ -201,7 +201,7 @@ static void do_request(void *rp) {
gpr_free(r->default_port);
gpr_free(r);
cb(arg, resolved);
- grpc_iomgr_unref();
+ grpc_iomgr_ref_address_resolution(-1);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -213,7 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
- grpc_iomgr_ref();
+ grpc_iomgr_ref_address_resolution(1);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index d675c2dcec..88b599b582 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -38,9 +38,7 @@
#include <string.h>
#include <unistd.h>
-#include "src/core/iomgr/alarm.h"
-#include "src/core/iomgr/iomgr_posix.h"
-#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
@@ -51,11 +49,8 @@
typedef struct {
void (*cb)(void *arg, grpc_endpoint *tcp);
void *cb_arg;
- gpr_mu mu;
grpc_fd *fd;
gpr_timespec deadline;
- grpc_alarm alarm;
- int refs;
} async_connect;
static int prepare_socket(int fd) {
@@ -79,42 +74,21 @@ error:
return 0;
}
-static void on_alarm(void *acp, int success) {
- int done;
- async_connect *ac = acp;
- gpr_mu_lock(&ac->mu);
- if (ac->fd != NULL && success) {
- grpc_fd_shutdown(ac->fd);
- }
- done = (--ac->refs == 0);
- gpr_mu_unlock(&ac->mu);
- if (done) {
- gpr_mu_destroy(&ac->mu);
- gpr_free(ac);
- }
-}
-
-static void on_writable(void *acp, int success) {
+static void on_writable(void *acp, grpc_iomgr_cb_status status) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
int err;
- int fd = ac->fd->fd;
- int done;
- grpc_endpoint *ep = NULL;
- void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
- void *cb_arg = ac->cb_arg;
-
- grpc_alarm_cancel(&ac->alarm);
+ int fd = grpc_fd_get(ac->fd);
- if (success) {
+ if (status == GRPC_CALLBACK_SUCCESS) {
do {
so_error_size = sizeof(so_error);
err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno));
- goto finish;
+ goto error;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
/* We will get one of these errors if we have run out of
@@ -132,7 +106,7 @@ static void on_writable(void *acp, int success) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
- grpc_fd_notify_on_write(ac->fd, on_writable, ac);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
return;
} else {
switch (so_error) {
@@ -143,31 +117,27 @@ static void on_writable(void *acp, int success) {
gpr_log(GPR_ERROR, "socket error: %d", so_error);
break;
}
- goto finish;
+ goto error;
}
} else {
- ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
- goto finish;
+ goto great_success;
}
} else {
- gpr_log(GPR_ERROR, "on_writable failed during connect");
- goto finish;
+ gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status);
+ goto error;
}
abort();
-finish:
- gpr_mu_lock(&ac->mu);
- if (!ep) {
- grpc_fd_orphan(ac->fd, NULL, NULL);
- }
- done = (--ac->refs == 0);
- gpr_mu_unlock(&ac->mu);
- if (done) {
- gpr_mu_destroy(&ac->mu);
- gpr_free(ac);
- }
- cb(cb_arg, ep);
+error:
+ ac->cb(ac->cb_arg, NULL);
+ grpc_fd_destroy(ac->fd, NULL, NULL);
+ gpr_free(ac);
+ return;
+
+great_success:
+ ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ gpr_free(ac);
}
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
@@ -206,7 +176,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
} while (err < 0 && errno == EINTR);
if (err >= 0) {
- gpr_log(GPR_DEBUG, "instant connect");
cb(arg,
grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
return;
@@ -222,10 +191,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac = gpr_malloc(sizeof(async_connect));
ac->cb = cb;
ac->cb_arg = arg;
+ ac->deadline = deadline;
ac->fd = grpc_fd_create(fd);
- gpr_mu_init(&ac->mu);
- ac->refs = 2;
-
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
- grpc_fd_notify_on_write(ac->fd, on_writable, ac);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline);
}
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 657f34aaf9..bc3ce69e47 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -255,14 +255,18 @@ typedef struct {
grpc_endpoint_read_cb read_cb;
void *read_user_data;
+ gpr_timespec read_deadline;
grpc_endpoint_write_cb write_cb;
void *write_user_data;
+ gpr_timespec write_deadline;
grpc_tcp_slice_state write_state;
} grpc_tcp;
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
+ grpc_iomgr_cb_status status);
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
+ grpc_iomgr_cb_status status);
static void grpc_tcp_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -272,7 +276,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) {
static void grpc_tcp_unref(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount);
if (refcount_zero) {
- grpc_fd_orphan(tcp->em_fd, NULL, NULL);
+ grpc_fd_destroy(tcp->em_fd, NULL, NULL);
gpr_free(tcp);
}
}
@@ -304,7 +308,8 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
+ grpc_iomgr_cb_status status) {
grpc_tcp *tcp = (grpc_tcp *)arg;
int iov_size = 1;
gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
@@ -319,12 +324,18 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0);
- if (!success) {
+ if (status == GRPC_CALLBACK_CANCELLED) {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
grpc_tcp_unref(tcp);
return;
}
+ if (status == GRPC_CALLBACK_TIMED_OUT) {
+ call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT);
+ grpc_tcp_unref(tcp);
+ return;
+ }
+
/* TODO(klempner): Limit the amount we read at once. */
for (;;) {
allocated_bytes = slice_state_append_blocks_into_iovec(
@@ -366,7 +377,8 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
} else {
/* Spurious read event, consume it here */
slice_state_destroy(&read_state);
- grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
+ grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp,
+ tcp->read_deadline);
}
} else {
/* TODO(klempner): Log interesting errors */
@@ -395,13 +407,14 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
}
static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data) {
+ void *user_data, gpr_timespec deadline) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_user_data = user_data;
+ tcp->read_deadline = deadline;
gpr_ref(&tcp->refcount);
- grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
+ grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline);
}
#define MAX_WRITE_IOVEC 16
@@ -447,24 +460,34 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
};
}
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
+ grpc_iomgr_cb_status status) {
grpc_tcp *tcp = (grpc_tcp *)arg;
grpc_endpoint_write_status write_status;
grpc_endpoint_cb_status cb_status;
grpc_endpoint_write_cb cb;
- if (!success) {
+ cb_status = GRPC_ENDPOINT_CB_OK;
+
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ cb_status = GRPC_ENDPOINT_CB_SHUTDOWN;
+ } else if (status == GRPC_CALLBACK_TIMED_OUT) {
+ cb_status = GRPC_ENDPOINT_CB_TIMED_OUT;
+ }
+
+ if (cb_status != GRPC_ENDPOINT_CB_OK) {
slice_state_destroy(&tcp->write_state);
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
+ cb(tcp->write_user_data, cb_status);
grpc_tcp_unref(tcp);
return;
}
write_status = grpc_tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
- grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
+ grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
+ tcp->write_deadline);
} else {
slice_state_destroy(&tcp->write_state);
if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
@@ -479,11 +502,9 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
}
}
-static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
+static grpc_endpoint_write_status grpc_tcp_write(
+ grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_endpoint_write_status status;
@@ -509,15 +530,17 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
gpr_ref(&tcp->refcount);
tcp->write_cb = cb;
tcp->write_user_data = user_data;
- grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
+ tcp->write_deadline = deadline;
+ grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
+ tcp->write_deadline);
}
return status;
}
static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
- grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_pollset_add_fd(pollset, tcp->em_fd);
+ /* tickle the pollset so we crash if things aren't wired correctly */
+ pollset->unused++;
}
static const grpc_endpoint_vtable vtable = {
@@ -527,12 +550,14 @@ static const grpc_endpoint_vtable vtable = {
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
- tcp->fd = em_fd->fd;
+ tcp->fd = grpc_fd_get(em_fd);
tcp->read_cb = NULL;
tcp->write_cb = NULL;
tcp->read_user_data = NULL;
tcp->write_user_data = NULL;
tcp->slice_size = slice_size;
+ tcp->read_deadline = gpr_inf_future;
+ tcp->write_deadline = gpr_inf_future;
slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h
index c3eef1b4b7..830394d534 100644
--- a/src/core/iomgr/tcp_posix.h
+++ b/src/core/iomgr/tcp_posix.h
@@ -45,7 +45,7 @@
*/
#include "src/core/iomgr/endpoint.h"
-#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 1968246b75..46fba13f90 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -49,8 +49,8 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep);
grpc_tcp_server *grpc_tcp_server_create();
/* Start listening to bound ports */
-void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset,
- grpc_tcp_server_cb cb, void *cb_arg);
+void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb,
+ void *cb_arg);
/* Add a port to the server, returning true on success, or false otherwise.
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 5ed517748a..2abaf15ce4 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -45,7 +45,7 @@
#include <string.h>
#include <errno.h>
-#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
@@ -97,8 +97,13 @@ grpc_tcp_server *grpc_tcp_server_create() {
return s;
}
+static void done_destroy(void *p, grpc_iomgr_cb_status status) {
+ gpr_event_set(p, (void *)1);
+}
+
void grpc_tcp_server_destroy(grpc_tcp_server *s) {
size_t i;
+ gpr_event fd_done;
gpr_mu_lock(&s->mu);
/* shutdown all fd's */
for (i = 0; i < s->nports; i++) {
@@ -113,7 +118,9 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) {
/* delete ALL the things */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
- grpc_fd_orphan(sp->emfd, NULL, NULL);
+ gpr_event_init(&fd_done);
+ grpc_fd_destroy(sp->emfd, done_destroy, &fd_done);
+ gpr_event_wait(&fd_done, gpr_inf_future);
}
gpr_free(s->ports);
gpr_free(s);
@@ -189,10 +196,10 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(void *arg, int success) {
+static void on_read(void *arg, grpc_iomgr_cb_status status) {
server_port *sp = arg;
- if (!success) {
+ if (status != GRPC_CALLBACK_SUCCESS) {
goto error;
}
@@ -208,7 +215,7 @@ static void on_read(void *arg, int success) {
case EINTR:
continue;
case EAGAIN:
- grpc_fd_notify_on_read(sp->emfd, on_read, sp);
+ grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future);
return;
default:
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
@@ -247,10 +254,15 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity);
}
sp = &s->ports[s->nports++];
- sp->server = s;
- sp->fd = fd;
sp->emfd = grpc_fd_create(fd);
- GPR_ASSERT(sp->emfd);
+ sp->fd = fd;
+ sp->server = s;
+ /* initialize the em desc */
+ if (sp->emfd == NULL) {
+ s->nports--;
+ gpr_mu_unlock(&s->mu);
+ return 0;
+ }
gpr_mu_unlock(&s->mu);
return 1;
@@ -307,8 +319,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) {
return (0 <= index && index < s->nports) ? s->ports[index].fd : -1;
}
-void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
- grpc_tcp_server_cb cb, void *cb_arg) {
+void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb,
+ void *cb_arg) {
size_t i;
GPR_ASSERT(cb);
gpr_mu_lock(&s->mu);
@@ -317,10 +329,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
- if (pollset) {
- grpc_pollset_add_fd(pollset, s->ports[i].emfd);
- }
- grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]);
+ grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i],
+ gpr_inf_future);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index c99ac8021d..442d2fa624 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -555,11 +555,12 @@ static int fake_oauth2_has_request_metadata_only(
return 1;
}
-void on_simulated_token_fetch_done(void *user_data, int success) {
+void on_simulated_token_fetch_done(void *user_data,
+ grpc_iomgr_cb_status status) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)r->creds;
- GPR_ASSERT(success);
+ GPR_ASSERT(status == GRPC_CALLBACK_SUCCESS);
r->cb(r->user_data, &c->access_token_md, 1, GRPC_CREDENTIALS_OK);
grpc_credentials_metadata_request_destroy(r);
}
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 7f0fdf73c9..cab09ca49d 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -184,7 +184,8 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
}
static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
- grpc_endpoint_read_cb cb, void *user_data) {
+ grpc_endpoint_read_cb cb, void *user_data,
+ gpr_timespec deadline) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
ep->read_cb = cb;
ep->read_user_data = user_data;
@@ -199,7 +200,7 @@ static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
return;
}
- grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep);
+ grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep, deadline);
}
static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -216,11 +217,9 @@ static void on_write(void *data, grpc_endpoint_cb_status error) {
secure_endpoint_unref(ep);
}
-static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
+static grpc_endpoint_write_status endpoint_write(
+ grpc_endpoint *secure_ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) {
int i = 0;
int output_buffer_count = 0;
tsi_result result = TSI_OK;
@@ -309,7 +308,7 @@ static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
/* Need to keep the endpoint alive across a transport */
secure_endpoint_ref(ep);
status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices,
- output_buffer_count, on_write, ep);
+ output_buffer_count, on_write, ep, deadline);
if (status != GRPC_ENDPOINT_WRITE_PENDING) {
secure_endpoint_unref(ep);
}
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 3df91ed8e7..eb11251912 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -105,7 +105,6 @@ static void check_peer(grpc_secure_transport_setup *s) {
grpc_security_status peer_status;
tsi_peer peer;
tsi_result result = tsi_handshaker_extract_peer(s->handshaker, &peer);
-
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Peer extraction failed with error %s",
tsi_result_to_string(result));
@@ -153,8 +152,9 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- write_status = grpc_endpoint_write(s->endpoint, &to_send, 1,
- on_handshake_data_sent_to_peer, s);
+ write_status =
+ grpc_endpoint_write(s->endpoint, &to_send, 1,
+ on_handshake_data_sent_to_peer, s, gpr_inf_future);
if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
secure_transport_setup_done(s, 0);
@@ -200,7 +200,8 @@ static void on_handshake_data_received_from_peer(
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
grpc_endpoint_notify_on_read(s->endpoint,
- on_handshake_data_received_from_peer, setup);
+ on_handshake_data_received_from_peer, setup,
+ gpr_inf_future);
cleanup_slices(slices, nslices);
return;
} else {
@@ -257,7 +258,8 @@ static void on_handshake_data_sent_to_peer(void *setup,
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
grpc_endpoint_notify_on_read(s->endpoint,
- on_handshake_data_received_from_peer, setup);
+ on_handshake_data_received_from_peer, setup,
+ gpr_inf_future);
} else {
check_peer(s);
}
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 9d7c0e5e5a..28b56dd4c9 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -77,9 +77,9 @@ static void on_accept(void *server, grpc_endpoint *tcp) {
/* Note: the following code is the same with server_chttp2.c */
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
+static void start(grpc_server *server, void *tcpp) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollset, on_accept, server);
+ grpc_tcp_server_start(tcp, on_accept, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further
diff --git a/src/core/support/time.c b/src/core/support/time.c
index 5330092f56..712bdf441c 100644
--- a/src/core/support/time.c
+++ b/src/core/support/time.c
@@ -249,18 +249,3 @@ gpr_timespec gpr_timespec_from_timeval(struct timeval t) {
ts.tv_nsec = t.tv_usec * 1000;
return ts;
}
-
-gpr_int32 gpr_time_to_millis(gpr_timespec t) {
- if (t.tv_sec >= 2147483) {
- if (t.tv_sec == 2147483 && t.tv_nsec < 648 * GPR_NS_PER_MS) {
- return 2147483 * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS;
- }
- return 2147483647;
- } else if (t.tv_sec <= -2147483) {
- /* TODO(ctiller): correct handling here (it's so far in the past do we
- care?) */
- return -2147483648;
- } else {
- return t.tv_sec * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS;
- }
-}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 9c5f5064eb..9ed617f665 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -878,9 +878,9 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) {
return &call->incoming_metadata;
}
-static void call_alarm(void *arg, int success) {
+static void call_alarm(void *arg, grpc_iomgr_cb_status status) {
grpc_call *call = arg;
- if (success) {
+ if (status == GRPC_CALLBACK_SUCCESS) {
grpc_call_cancel(call);
}
grpc_call_internal_unref(call);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b59c36e03a..4837f5b978 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,7 +36,7 @@
#include <stdio.h>
#include <string.h>
-#include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/iomgr_completion_queue_interface.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
@@ -61,7 +61,6 @@ typedef struct event {
/* Completion queue structure */
struct grpc_completion_queue {
- /* TODO(ctiller): see if this can be removed */
int allow_polling;
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
@@ -101,7 +100,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
/* Create and append an event to the queue. Returns the event so that its data
members can be filled in.
- Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
+ Requires grpc_iomgr_mu locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data) {
@@ -127,8 +126,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
}
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
- grpc_pollset_kick(&cc->pollset);
+ gpr_cv_broadcast(&grpc_iomgr_cv);
return ev;
}
@@ -151,7 +149,7 @@ static void end_op_locked(grpc_completion_queue *cc,
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ gpr_cv_broadcast(&grpc_iomgr_cv);
}
}
@@ -159,11 +157,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_byte_buffer *read) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
ev->base.data.read = read;
end_op_locked(cc, GRPC_READ);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
@@ -171,11 +169,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.invoke_accepted = error;
end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
@@ -183,11 +181,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
end_op_locked(cc, GRPC_WRITE_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
@@ -195,11 +193,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.finish_accepted = error;
end_op_locked(cc, GRPC_FINISH_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
@@ -208,13 +206,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
void *user_data, size_t count,
grpc_metadata *elements) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
user_data);
ev->base.data.client_metadata_read.count = count;
ev->base.data.client_metadata_read.elements = elements;
end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -223,14 +221,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_metadata *metadata_elements,
size_t metadata_count) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ev->base.data.finished.status = status;
ev->base.data.finished.details = details;
ev->base.data.finished.metadata_count = metadata_count;
ev->base.data.finished.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_FINISHED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -239,7 +237,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
ev->base.data.server_rpc_new.method = method;
ev->base.data.server_rpc_new.host = host;
@@ -247,7 +245,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
ev->base.data.server_rpc_new.metadata_count = metadata_count;
ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_SERVER_RPC_NEW);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
@@ -264,7 +262,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
for (;;) {
if (cc->queue != NULL) {
gpr_uintptr bucket;
@@ -290,16 +288,15 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
+ if (cc->allow_polling && grpc_iomgr_work(deadline)) {
continue;
}
- if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
- GRPC_POLLSET_MU(&cc->pollset), deadline)) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
return NULL;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -337,7 +334,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
for (;;) {
if ((ev = pluck_event(cc, tag))) {
break;
@@ -346,16 +343,15 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
+ if (cc->allow_polling && grpc_iomgr_work(deadline)) {
continue;
}
- if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
- GRPC_POLLSET_MU(&cc->pollset), deadline)) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
return NULL;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -364,11 +360,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->refs)) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index aa544a97f2..3829e7aa8f 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -52,7 +52,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
- void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
+ void (*start)(grpc_server *server, void *arg);
void (*destroy)(grpc_server *server, void *arg);
struct listener *next;
} listener;
@@ -192,7 +192,7 @@ static void orphan_channel(channel_data *chand) {
chand->next = chand->prev = chand;
}
-static void finish_destroy_channel(void *cd, int success) {
+static void finish_destroy_channel(void *cd, grpc_iomgr_cb_status status) {
channel_data *chand = cd;
grpc_server *server = chand->server;
/*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/
@@ -247,7 +247,7 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_mu_unlock(&server->mu);
}
-static void kill_zombie(void *elem, int success) {
+static void kill_zombie(void *elem, grpc_iomgr_cb_status status) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
@@ -336,7 +336,7 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void finish_shutdown_channel(void *cd, int success) {
+static void finish_shutdown_channel(void *cd, grpc_iomgr_cb_status status) {
channel_data *chand = cd;
grpc_channel_op op;
op.type = GRPC_CHANNEL_DISCONNECT;
@@ -468,7 +468,7 @@ void grpc_server_start(grpc_server *server) {
listener *l;
for (l = server->listeners; l; l = l->next) {
- l->start(server, l->arg, grpc_cq_pollset(server->cq));
+ l->start(server, l->arg);
}
}
@@ -596,8 +596,7 @@ void grpc_server_destroy(grpc_server *server) {
}
void grpc_server_add_listener(grpc_server *server, void *arg,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ void (*start)(grpc_server *server, void *arg),
void (*destroy)(grpc_server *server, void *arg)) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 61292ebe4e..f0773ab9d5 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -47,8 +47,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
void grpc_server_add_listener(grpc_server *server, void *listener,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ void (*start)(grpc_server *server, void *arg),
void (*destroy)(grpc_server *server, void *arg));
/* Setup a transport - creates a channel stack, binds the transport to the
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index a0961bd449..a5fdd03774 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
}
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
+static void start(grpc_server *server, void *tcpp) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollset, new_transport, server);
+ grpc_tcp_server_start(tcp, new_transport, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 5bf763e76f..a8ae8cc5bc 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -711,7 +711,7 @@ static void unlock(transport *t) {
/* write some bytes if necessary */
while (start_write) {
switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
- finish_write, t)) {
+ finish_write, t, gpr_inf_future)) {
case GRPC_ENDPOINT_WRITE_DONE:
/* grab the lock directly without wrappers since we just want to
continue writes if we loop: no need to check read callbacks again */
@@ -1617,6 +1617,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
case GRPC_ENDPOINT_CB_SHUTDOWN:
case GRPC_ENDPOINT_CB_EOF:
case GRPC_ENDPOINT_CB_ERROR:
+ case GRPC_ENDPOINT_CB_TIMED_OUT:
lock(t);
drop_connection(t);
t->reading = 0;
@@ -1641,7 +1642,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
if (keep_reading) {
- grpc_endpoint_notify_on_read(t->ep, recv_data, t);
+ grpc_endpoint_notify_on_read(t->ep, recv_data, t, gpr_inf_future);
}
}
diff --git a/src/ruby/Gemfile b/src/ruby/Gemfile
index 597a7d4f4b..dc05f7946b 100755
--- a/src/ruby/Gemfile
+++ b/src/ruby/Gemfile
@@ -1,4 +1,12 @@
source 'https://rubygems.org'
+# Modify this when working locally, see README.md
+# e.g,
+# gem 'beefcake', path: "/usr/local/google/repos/beefcake"
+#
+# The default value is what's used for gRPC ruby's GCE configuration
+#
+gem 'beefcake', path: "/var/local/git/beefcake"
+
# Specify your gem's dependencies in grpc.gemspec
gemspec
diff --git a/src/ruby/README.md b/src/ruby/README.md
index 23aec2b20a..3a5c50819b 100755
--- a/src/ruby/README.md
+++ b/src/ruby/README.md
@@ -20,25 +20,67 @@ DEPENDENCIES
The extension can be built and tested using
[rake](https://rubygems.org/gems/rake). However, the rake-extensiontask rule
is not supported on older versions of rubygems, and the necessary version of
-rubygems.
+rubygems is not available on the latest version of Goobuntu.
This is resolved by using [RVM](https://rvm.io/) instead; install a single-user
-ruby environment, and develop on the latest stable version of ruby (2.1.5).
+ruby environment, and develop on the latest stable version of ruby (2.1.2).
+
+
+* Proto code generation
+
+To build generate service stubs and skeletons, it's currently necessary to use
+a patched version of a beefcake, a simple third-party proto2 library. This is
+feature compatible with proto3 and will be replaced by official proto3 support
+in protoc.
+
+* Patched protoc
+
+The patched version of beefcake in turn depends on a patched version of protoc.
+This is an update of the latest open source release of protoc with some forward
+looking proto3 patches.
INSTALLATION PREREQUISITES
--------------------------
+Install the patched protoc
+
+$ cd <git_repo_dir>
+$ git clone sso://team/one-platform-grpc-team/protobuf
+$ cd protobuf
+$ ./configure --prefix=/usr
+$ make
+$ sudo make install
+
+Install an update to OpenSSL with ALPN support
+
+$ wget https://www.openssl.org/source/openssl-1.0.2-beta3.tar.gz
+$ tar -zxvf openssl-1.0.2-beta3.tar.gz
+$ cd openssl-1.0.2-beta3
+$ ./config shared
+$ make
+$ sudo make install
+
Install RVM
+$ # the -with-openssl-dir ensures that ruby uses the updated version of SSL
$ command curl -sSL https://rvm.io/mpapis.asc | gpg --import -
$ \curl -sSL https://get.rvm.io | bash -s stable --ruby
$
$ # follow the instructions to ensure that your're using the latest stable version of Ruby
$ # and that the rvm command is installed
$
+$ rvm reinstall 2.1.5 --with-openssl-dir=/usr/local/ssl
$ gem install bundler # install bundler, the standard ruby package manager
+Install the patched beefcake, and update the Gemfile to reference
+
+$ cd <git_repo_dir>
+$ git clone sso://team/one-platform-grpc-team/grpc-ruby-beefcake beefcake
+$ cd beefcake
+$ bundle install
+$
+
HACKING
-------
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index d0478bb4d1..6e7bfeef97 100644
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -47,11 +47,9 @@ require 'minitest'
require 'minitest/assertions'
require 'grpc'
-require 'google/protobuf'
-require 'test/cpp/interop/test_services'
-require 'test/cpp/interop/messages'
-require 'test/cpp/interop/empty'
+require 'third_party/stubby/testing/proto/test.pb'
+require 'third_party/stubby/testing/proto/messages.pb'
# loads the certificates used to access the test server securely.
def load_test_certs
@@ -81,7 +79,7 @@ end
# produces a string of null chars (\0) of length l.
def nulls(l)
raise 'requires #{l} to be +ve' if l < 0
- [].pack('x' * l).force_encoding('utf-8')
+ [].pack('x' * l)
end
# a PingPongPlayer implements the ping pong bidi test.
@@ -107,11 +105,10 @@ class PingPongPlayer
req_size, resp_size = m
req = req_cls.new(:payload => Payload.new(:body => nulls(req_size)),
:response_type => COMPRESSABLE,
- :response_parameters => [p_cls.new(:size => resp_size)])
+ :response_parameters => p_cls.new(:size => resp_size))
yield req
resp = @queue.pop
- assert_equal(PayloadType.lookup(COMPRESSABLE), resp.payload.type,
- 'payload type is wrong')
+ assert_equal(COMPRESSABLE, resp.payload.type, 'payload type is wrong')
assert_equal(resp_size, resp.payload.body.length,
'payload body #{i} has the wrong length')
p "OK: ping_pong #{count}"
@@ -135,10 +132,10 @@ class NamedTests
# TESTING
# PASSED
# FAIL
- # ruby server: fails protobuf-ruby can't pass an empty message
+ # ruby server: fails beefcake throws on deserializing the 0-length message
def empty_unary
- resp = @stub.empty_call(Empty.new)
- assert resp.is_a?(Empty), 'empty_unary: invalid response'
+ resp = @stub.empty_call(Proto2::Empty.new)
+ assert resp.is_a?(Proto::Empty), 'empty_unary: invalid response'
p 'OK: empty_unary'
end
@@ -189,8 +186,7 @@ class NamedTests
resps = @stub.streaming_output_call(req)
resps.each_with_index do |r, i|
assert i < msg_sizes.length, 'too many responses'
- assert_equal(PayloadType.lookup(COMPRESSABLE), r.payload.type,
- 'payload type is wrong')
+ assert_equal(COMPRESSABLE, r.payload.type, 'payload type is wrong')
assert_equal(msg_sizes[i], r.payload.body.length,
'payload body #{i} has the wrong length')
end
@@ -201,6 +197,9 @@ class NamedTests
# PASSED
# ruby server
# FAILED
+ #
+ # TODO(temiola): update this test to stay consistent with the java test's
+ # interpretation of the test spec.
def ping_pong
msg_sizes = [[27182, 31415], [8, 9], [1828, 2653], [45904, 58979]]
ppp = PingPongPlayer.new(msg_sizes)
diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb
index 53e271e80d..35d69f6fd3 100644
--- a/src/ruby/bin/interop/interop_server.rb
+++ b/src/ruby/bin/interop/interop_server.rb
@@ -47,9 +47,8 @@ require 'optparse'
require 'grpc'
-require 'test/cpp/interop/test_services'
-require 'test/cpp/interop/messages'
-require 'test/cpp/interop/empty'
+require 'third_party/stubby/testing/proto/test.pb'
+require 'third_party/stubby/testing/proto/messages.pb'
# loads the certificates by the test server.
def load_test_certs
@@ -68,7 +67,7 @@ end
# produces a string of null chars (\0) of length l.
def nulls(l)
raise 'requires #{l} to be +ve' if l < 0
- [].pack('x' * l).force_encoding('utf-8')
+ [].pack('x' * l)
end
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
@@ -99,7 +98,7 @@ class TestTarget < Grpc::Testing::TestService::Service
include Grpc::Testing::PayloadType
def empty_call(empty, call)
- Empty.new
+ Proto::Empty.new
end
def unary_call(simple_req, call)
diff --git a/src/ruby/bin/interop/net/proto2/bridge/proto/message_set.pb.rb b/src/ruby/bin/interop/net/proto2/bridge/proto/message_set.pb.rb
new file mode 100755
index 0000000000..eadd1d4ceb
--- /dev/null
+++ b/src/ruby/bin/interop/net/proto2/bridge/proto/message_set.pb.rb
@@ -0,0 +1,14 @@
+## Generated from net/proto2/bridge/proto/message_set.proto for proto2.bridge
+require 'beefcake'
+
+module Proto2
+ module Bridge
+
+ class MessageSet
+ include Beefcake::Message
+ end
+
+ class MessageSet
+ end
+ end
+end
diff --git a/src/ruby/bin/interop/net/proto2/proto/empty.pb.rb b/src/ruby/bin/interop/net/proto2/proto/empty.pb.rb
new file mode 100755
index 0000000000..2aa0c76509
--- /dev/null
+++ b/src/ruby/bin/interop/net/proto2/proto/empty.pb.rb
@@ -0,0 +1,12 @@
+## Generated from net/proto2/proto/empty.proto for proto2
+require 'beefcake'
+
+module Proto2
+
+ class Empty
+ include Beefcake::Message
+ end
+
+ class Empty
+ end
+end
diff --git a/src/ruby/bin/interop/test/cpp/interop/empty.rb b/src/ruby/bin/interop/test/cpp/interop/empty.rb
deleted file mode 100644
index acd4160d24..0000000000
--- a/src/ruby/bin/interop/test/cpp/interop/empty.rb
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2014, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: test/cpp/interop/empty.proto
-
-require 'google/protobuf'
-
-Google::Protobuf::DescriptorPool.generated_pool.build do
- add_message "grpc.testing.Empty" do
- end
-end
-
-module Grpc
- module Testing
- Empty = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Empty").msgclass
- end
-end
diff --git a/src/ruby/bin/interop/test/cpp/interop/messages.rb b/src/ruby/bin/interop/test/cpp/interop/messages.rb
deleted file mode 100644
index 491608bff2..0000000000
--- a/src/ruby/bin/interop/test/cpp/interop/messages.rb
+++ /dev/null
@@ -1,86 +0,0 @@
-# Copyright 2014, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: test/cpp/interop/messages.proto
-
-require 'google/protobuf'
-
-Google::Protobuf::DescriptorPool.generated_pool.build do
- add_message "grpc.testing.Payload" do
- optional :type, :enum, 1, "grpc.testing.PayloadType"
- optional :body, :string, 2
- end
- add_message "grpc.testing.SimpleRequest" do
- optional :response_type, :enum, 1, "grpc.testing.PayloadType"
- optional :response_size, :int32, 2
- optional :payload, :message, 3, "grpc.testing.Payload"
- end
- add_message "grpc.testing.SimpleResponse" do
- optional :payload, :message, 1, "grpc.testing.Payload"
- optional :effective_gaia_user_id, :int64, 2
- end
- add_message "grpc.testing.StreamingInputCallRequest" do
- optional :payload, :message, 1, "grpc.testing.Payload"
- end
- add_message "grpc.testing.StreamingInputCallResponse" do
- optional :aggregated_payload_size, :int32, 1
- end
- add_message "grpc.testing.ResponseParameters" do
- optional :size, :int32, 1
- optional :interval_us, :int32, 2
- end
- add_message "grpc.testing.StreamingOutputCallRequest" do
- optional :response_type, :enum, 1, "grpc.testing.PayloadType"
- repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters"
- optional :payload, :message, 3, "grpc.testing.Payload"
- end
- add_message "grpc.testing.StreamingOutputCallResponse" do
- optional :payload, :message, 1, "grpc.testing.Payload"
- end
- add_enum "grpc.testing.PayloadType" do
- value :COMPRESSABLE, 0
- value :UNCOMPRESSABLE, 1
- value :RANDOM, 2
- end
-end
-
-module Grpc
- module Testing
- Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass
- SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass
- SimpleResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleResponse").msgclass
- StreamingInputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallRequest").msgclass
- StreamingInputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallResponse").msgclass
- ResponseParameters = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ResponseParameters").msgclass
- StreamingOutputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallRequest").msgclass
- StreamingOutputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallResponse").msgclass
- PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule
- end
-end
diff --git a/src/ruby/bin/interop/test/cpp/interop/test.rb b/src/ruby/bin/interop/test/cpp/interop/test.rb
deleted file mode 100644
index 0b391ed6af..0000000000
--- a/src/ruby/bin/interop/test/cpp/interop/test.rb
+++ /dev/null
@@ -1,43 +0,0 @@
-# Copyright 2014, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: test/cpp/interop/test.proto
-
-require 'google/protobuf'
-
-require 'test/cpp/interop/empty'
-require 'test/cpp/interop/messages'
-Google::Protobuf::DescriptorPool.generated_pool.build do
-end
-
-module Grpc
- module Testing
- end
-end
diff --git a/src/ruby/bin/interop/test/cpp/interop/test_services.rb b/src/ruby/bin/interop/test/cpp/interop/test_services.rb
deleted file mode 100644
index 17b5461d3e..0000000000
--- a/src/ruby/bin/interop/test/cpp/interop/test_services.rb
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright 2014, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# Source: test/cpp/interop/test.proto for package 'grpc.testing'
-
-require 'grpc'
-require 'test/cpp/interop/test'
-
-module Grpc
- module Testing
- module TestService
-
- # TODO: add proto service documentation here
- class Service
-
- include GRPC::GenericService
-
- self.marshal_class_method = :encode
- self.unmarshal_class_method = :decode
- self.service_name = 'grpc.testing.TestService'
-
- rpc :EmptyCall, Empty, Empty
- rpc :UnaryCall, SimpleRequest, SimpleResponse
- rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse)
- rpc :StreamingInputCall, stream(StreamingInputCallRequest), StreamingInputCallResponse
- rpc :FullDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse)
- rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse)
- end
-
- Stub = Service.rpc_stub_class
- end
- end
-end
diff --git a/src/ruby/bin/interop/third_party/stubby/testing/proto/messages.pb.rb b/src/ruby/bin/interop/third_party/stubby/testing/proto/messages.pb.rb
new file mode 100755
index 0000000000..9a913f93e5
--- /dev/null
+++ b/src/ruby/bin/interop/third_party/stubby/testing/proto/messages.pb.rb
@@ -0,0 +1,94 @@
+## Generated from third_party/stubby/testing/proto/messages.proto for grpc.testing
+require 'beefcake'
+
+require 'net/proto2/bridge/proto/message_set.pb'
+
+module Grpc
+ module Testing
+
+ module PayloadType
+ COMPRESSABLE = 0
+ UNCOMPRESSABLE = 1
+ RANDOM = 2
+ end
+
+ class Payload
+ include Beefcake::Message
+ end
+
+ class SimpleRequest
+ include Beefcake::Message
+ end
+
+ class SimpleResponse
+ include Beefcake::Message
+ end
+
+ class SimpleContext
+ include Beefcake::Message
+ end
+
+ class StreamingInputCallRequest
+ include Beefcake::Message
+ end
+
+ class StreamingInputCallResponse
+ include Beefcake::Message
+ end
+
+ class ResponseParameters
+ include Beefcake::Message
+ end
+
+ class StreamingOutputCallRequest
+ include Beefcake::Message
+ end
+
+ class StreamingOutputCallResponse
+ include Beefcake::Message
+ end
+
+ class Payload
+ optional :type, PayloadType, 1
+ optional :body, :bytes, 2
+ end
+
+ class SimpleRequest
+ optional :response_type, PayloadType, 1
+ optional :response_size, :int32, 2
+ optional :payload, Payload, 3
+ end
+
+ class SimpleResponse
+ optional :payload, Payload, 1
+ optional :effective_gaia_user_id, :int64, 2
+ end
+
+ class SimpleContext
+ optional :value, :string, 1
+ end
+
+ class StreamingInputCallRequest
+ optional :payload, Payload, 1
+ end
+
+ class StreamingInputCallResponse
+ optional :aggregated_payload_size, :int32, 1
+ end
+
+ class ResponseParameters
+ optional :size, :int32, 1
+ optional :interval_us, :int32, 2
+ end
+
+ class StreamingOutputCallRequest
+ optional :response_type, PayloadType, 1
+ repeated :response_parameters, ResponseParameters, 2
+ optional :payload, Payload, 3
+ end
+
+ class StreamingOutputCallResponse
+ optional :payload, Payload, 1
+ end
+ end
+end
diff --git a/src/ruby/bin/interop/third_party/stubby/testing/proto/test.pb.rb b/src/ruby/bin/interop/third_party/stubby/testing/proto/test.pb.rb
new file mode 100755
index 0000000000..2e21460fe6
--- /dev/null
+++ b/src/ruby/bin/interop/third_party/stubby/testing/proto/test.pb.rb
@@ -0,0 +1,30 @@
+## Generated from third_party/stubby/testing/proto/test.proto for grpc.testing
+require 'beefcake'
+require 'grpc'
+
+require 'third_party/stubby/testing/proto/messages.pb'
+require 'net/proto2/proto/empty.pb'
+
+module Grpc
+ module Testing
+
+ module TestService
+
+ class Service
+ include GRPC::GenericService
+
+ self.marshal_class_method = :encode
+ self.unmarshal_class_method = :decode
+
+ rpc :EmptyCall, Proto2::Empty, Proto2::Empty
+ rpc :UnaryCall, SimpleRequest, SimpleResponse
+ rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse)
+ rpc :StreamingInputCall, stream(StreamingInputCallRequest), StreamingInputCallResponse
+ rpc :FullDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse)
+ rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse)
+ end
+ Stub = Service.rpc_stub_class
+
+ end
+ end
+end
diff --git a/src/ruby/bin/math.pb.rb b/src/ruby/bin/math.pb.rb
new file mode 100755
index 0000000000..f6976be568
--- /dev/null
+++ b/src/ruby/bin/math.pb.rb
@@ -0,0 +1,65 @@
+## Generated from bin/math.proto for math
+require "beefcake"
+require "grpc"
+
+module Math
+
+ class DivArgs
+ include Beefcake::Message
+ end
+
+ class DivReply
+ include Beefcake::Message
+ end
+
+ class FibArgs
+ include Beefcake::Message
+ end
+
+ class Num
+ include Beefcake::Message
+ end
+
+ class FibReply
+ include Beefcake::Message
+ end
+
+ class DivArgs
+ required :dividend, :int64, 1
+ required :divisor, :int64, 2
+ end
+
+ class DivReply
+ required :quotient, :int64, 1
+ required :remainder, :int64, 2
+ end
+
+ class FibArgs
+ optional :limit, :int64, 1
+ end
+
+ class Num
+ required :num, :int64, 1
+ end
+
+ class FibReply
+ required :count, :int64, 1
+ end
+
+ module Math
+
+ class Service
+ include GRPC::GenericService
+
+ self.marshal_class_method = :encode
+ self.unmarshal_class_method = :decode
+
+ rpc :Div, DivArgs, DivReply
+ rpc :DivMany, stream(DivArgs), stream(DivReply)
+ rpc :Fib, FibArgs, stream(Num)
+ rpc :Sum, stream(Num), Num
+ end
+ Stub = Service.rpc_stub_class
+
+ end
+end
diff --git a/src/ruby/bin/math.proto b/src/ruby/bin/math.proto
index c49787ad54..de18a50260 100755
--- a/src/ruby/bin/math.proto
+++ b/src/ruby/bin/math.proto
@@ -1,15 +1,15 @@
-syntax = "proto3";
+syntax = "proto2";
package math;
message DivArgs {
- optional int64 dividend = 1;
- optional int64 divisor = 2;
+ required int64 dividend = 1;
+ required int64 divisor = 2;
}
message DivReply {
- optional int64 quotient = 1;
- optional int64 remainder = 2;
+ required int64 quotient = 1;
+ required int64 remainder = 2;
}
message FibArgs {
@@ -17,11 +17,11 @@ message FibArgs {
}
message Num {
- optional int64 num = 1;
+ required int64 num = 1;
}
message FibReply {
- optional int64 count = 1;
+ required int64 count = 1;
}
service Math {
diff --git a/src/ruby/bin/math.rb b/src/ruby/bin/math.rb
deleted file mode 100644
index 09d1e98586..0000000000
--- a/src/ruby/bin/math.rb
+++ /dev/null
@@ -1,61 +0,0 @@
-# Copyright 2014, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: math.proto
-
-require 'google/protobuf'
-
-Google::Protobuf::DescriptorPool.generated_pool.build do
- add_message "math.DivArgs" do
- optional :dividend, :int64, 1
- optional :divisor, :int64, 2
- end
- add_message "math.DivReply" do
- optional :quotient, :int64, 1
- optional :remainder, :int64, 2
- end
- add_message "math.FibArgs" do
- optional :limit, :int64, 1
- end
- add_message "math.Num" do
- optional :num, :int64, 1
- end
- add_message "math.FibReply" do
- optional :count, :int64, 1
- end
-end
-
-module Math
- DivArgs = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.DivArgs").msgclass
- DivReply = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.DivReply").msgclass
- FibArgs = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.FibArgs").msgclass
- Num = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.Num").msgclass
- FibReply = Google::Protobuf::DescriptorPool.generated_pool.lookup("math.FibReply").msgclass
-end
diff --git a/src/ruby/bin/math_client.rb b/src/ruby/bin/math_client.rb
index 5cba9317f4..8a62764c08 100644
--- a/src/ruby/bin/math_client.rb
+++ b/src/ruby/bin/math_client.rb
@@ -40,7 +40,7 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'grpc'
-require 'math_services'
+require 'math.pb'
require 'optparse'
include GRPC::Core::TimeConsts
@@ -111,8 +111,8 @@ def main
'secure' => false
}
OptionParser.new do |opts|
- opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]'
- opts.on('--host HOST', '<hostname>:<port>') do |v|
+ opts.banner = 'Usage: [--host|-h <hostname>:<port>] [--secure|-s]'
+ opts.on('-h', '--host', '<hostname>:<port>') do |v|
options['host'] = v
end
opts.on('-s', '--secure', 'access using test creds') do |v|
diff --git a/src/ruby/bin/math_server.rb b/src/ruby/bin/math_server.rb
index a0f301c3e7..ed39144d7a 100644
--- a/src/ruby/bin/math_server.rb
+++ b/src/ruby/bin/math_server.rb
@@ -41,7 +41,7 @@ $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'forwardable'
require 'grpc'
-require 'math_services'
+require 'math.pb'
require 'optparse'
# Holds state for a fibonacci series
@@ -168,8 +168,8 @@ def main
'secure' => false
}
OptionParser.new do |opts|
- opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]'
- opts.on('--host HOST', '<hostname>:<port>') do |v|
+ opts.banner = 'Usage: [--host|-h <hostname>:<port>] [--secure|-s]'
+ opts.on('-h', '--host', '<hostname>:<port>') do |v|
options['host'] = v
end
opts.on('-s', '--secure', 'access using test creds') do |v|
diff --git a/src/ruby/bin/math_services.rb b/src/ruby/bin/math_services.rb
deleted file mode 100644
index f6ca6fe060..0000000000
--- a/src/ruby/bin/math_services.rb
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright 2014, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# Source: math.proto for package 'math'
-
-require 'grpc'
-require 'math'
-
-module Math
- module Math
-
- # TODO: add proto service documentation here
- class Service
-
- include GRPC::GenericService
-
- self.marshal_class_method = :encode
- self.unmarshal_class_method = :decode
- self.service_name = 'math.Math'
-
- rpc :Div, DivArgs, DivReply
- rpc :DivMany, stream(DivArgs), stream(DivReply)
- rpc :Fib, FibArgs, stream(Num)
- rpc :Sum, stream(Num), Num
- end
-
- Stub = Service.rpc_stub_class
- end
-end
diff --git a/src/ruby/bin/noproto_client.rb b/src/ruby/bin/noproto_client.rb
index 50ae9fb68f..29ed6d9f7c 100644
--- a/src/ruby/bin/noproto_client.rb
+++ b/src/ruby/bin/noproto_client.rb
@@ -37,68 +37,36 @@ lib_dir = File.join(File.dirname(this_dir), 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
require 'grpc'
-require 'optparse'
-class NoProtoMsg
- def self.marshal(o)
+class EchoMsg
+ def marshal
''
end
def self.unmarshal(o)
- NoProtoMsg.new
+ EchoMsg.new
end
end
-class NoProtoService
+class EchoService
include GRPC::GenericService
- rpc :AnRPC, NoProtoMsg, NoProtoMsg
-end
+ rpc :AnRPC, EchoMsg, EchoMsg
-NoProtoStub = NoProtoService.rpc_stub_class
+ def initialize(default_var='ignored')
+ end
-def load_test_certs
- this_dir = File.expand_path(File.dirname(__FILE__))
- data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
- files = ['ca.pem', 'server1.key', 'server1.pem']
- files.map { |f| File.open(File.join(data_dir, f)).read }
+ def an_rpc(req, call)
+ logger.info('echo service received a request')
+ req
+ end
end
-def test_creds
- certs = load_test_certs
- creds = GRPC::Core::Credentials.new(certs[0])
-end
+EchoStub = EchoService.rpc_stub_class
def main
- options = {
- 'host' => 'localhost:7071',
- 'secure' => false
- }
- OptionParser.new do |opts|
- opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]'
- opts.on('--host HOST', '<hostname>:<port>') do |v|
- options['host'] = v
- end
- opts.on('-s', '--secure', 'access using test creds') do |v|
- options['secure'] = true
- end
- end.parse!
-
- if options['secure']
- stub_opts = {
- :creds => test_creds,
- GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com',
- }
- p stub_opts
- p options['host']
- stub = NoProtoStub.new(options['host'], **stub_opts)
- logger.info("... connecting securely on #{options['host']}")
- else
- stub = NoProtoStub.new(options['host'])
- logger.info("... connecting insecurely on #{options['host']}")
- end
-
- logger.info('sending a NoProto rpc')
- resp = stub.an_rpc(NoProtoMsg.new)
+ stub = EchoStub.new('localhost:9090')
+ logger.info('sending an rpc')
+ resp = stub.an_rpc(EchoMsg.new)
logger.info("got a response: #{resp}")
end
diff --git a/src/ruby/bin/noproto_server.rb b/src/ruby/bin/noproto_server.rb
index d410827b22..7b74fa13ec 100644
--- a/src/ruby/bin/noproto_server.rb
+++ b/src/ruby/bin/noproto_server.rb
@@ -37,24 +37,23 @@ lib_dir = File.join(File.dirname(this_dir), 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
require 'grpc'
-require 'optparse'
-class NoProtoMsg
- def self.marshal(o)
+class EchoMsg
+ def marshal
''
end
def self.unmarshal(o)
- NoProtoMsg.new
+ EchoMsg.new
end
end
-class NoProtoService
+class EchoService
include GRPC::GenericService
- rpc :AnRPC, NoProtoMsg, NoProtoMsg
+ rpc :AnRPC, EchoMsg, EchoMsg
end
-class NoProto < NoProtoService
+class Echo < EchoService
def initialize(default_var='ignored')
end
@@ -64,46 +63,11 @@ class NoProto < NoProtoService
end
end
-def load_test_certs
- this_dir = File.expand_path(File.dirname(__FILE__))
- data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
- files = ['ca.pem', 'server1.key', 'server1.pem']
- files.map { |f| File.open(File.join(data_dir, f)).read }
-end
-
-def test_server_creds
- certs = load_test_certs
- server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
-end
-
def main
- options = {
- 'host' => 'localhost:9090',
- 'secure' => false
- }
- OptionParser.new do |opts|
- opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]'
- opts.on('--host HOST', '<hostname>:<port>') do |v|
- options['host'] = v
- end
- opts.on('-s', '--secure', 'access using test creds') do |v|
- options['secure'] = true
- end
- end.parse!
-
- if options['secure']
- s = GRPC::RpcServer.new(creds: test_server_creds)
- s.add_http2_port(options['host'], true)
- logger.info("... running securely on #{options['host']}")
- else
- s = GRPC::RpcServer.new
- s.add_http2_port(options['host'])
- logger.info("... running insecurely on #{options['host']}")
- end
-
- s.handle(NoProto)
+ s = GRPC::RpcServer.new()
+ s.add_http2_port('localhost:9090')
+ s.handle(Echo)
s.run
end
-
main
diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec
index 3e1dcd578b..b535de4946 100755
--- a/src/ruby/grpc.gemspec
+++ b/src/ruby/grpc.gemspec
@@ -19,7 +19,7 @@ Gem::Specification.new do |s|
s.add_dependency 'xray'
s.add_dependency 'logging', '~> 1.8'
- s.add_dependency 'google-protobuf', '~> 3.0.0alpha'
+ s.add_dependency 'beefcake', '~> 1.1'
s.add_dependency 'minitest', '~> 5.4' # not a dev dependency, used by the interop tests
s.add_development_dependency "bundler", "~> 1.7"
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb
index 81c67ec859..c7eec3388c 100644
--- a/src/ruby/lib/grpc.rb
+++ b/src/ruby/lib/grpc.rb
@@ -27,6 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+require 'grpc/beefcake' # extends beefcake
require 'grpc/errors'
require 'grpc/grpc'
require 'grpc/logconfig'
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index f3fe638fce..6a4356fab9 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -93,8 +93,6 @@ module Google::RPC
# The Dsl verifies that the types in the descriptor have both the
# unmarshal and marshal methods.
attr_writer(:marshal_class_method, :unmarshal_class_method)
-
- # This allows configuration of the service name.
attr_accessor(:service_name)
# Adds an RPC spec.
@@ -119,8 +117,8 @@ module Google::RPC
end
def inherited(subclass)
- # Each subclass should have a distinct class variable with its own
- # rpc_descs
+ # Each subclass should have distinct class variable with its own
+ # rpc_descs.
subclass.rpc_descs.merge!(rpc_descs)
subclass.service_name = service_name
end
@@ -229,9 +227,8 @@ module Google::RPC
def self.included(o)
o.extend(Dsl)
- # Update to the use the service name including module. Proivde a default
- # that can be nil e,g. when modules are declared dynamically.
- return unless o.service_name.nil?
+ # Update to the use the name including module. This can be nil e,g. when
+ # modules are declared dynamically.
if o.name.nil?
o.service_name = 'GenericService'
else
diff --git a/src/ruby/spec/generic/service_spec.rb b/src/ruby/spec/generic/service_spec.rb
index a8e0c6f52f..dc921d8934 100644
--- a/src/ruby/spec/generic/service_spec.rb
+++ b/src/ruby/spec/generic/service_spec.rb
@@ -108,39 +108,6 @@ describe GenericService do
expect(c.rpc_descs[:AnRpc]).to be_a(GRPC::RpcDesc)
end
- it 'adds a default service name' do
- c = Class.new do
- include GenericService
- end
- expect(c.service_name).to eq('GenericService')
- end
-
- it 'adds a default service name to subclasses' do
- base = Class.new do
- include GenericService
- end
- c = Class.new(base) do
- end
- expect(c.service_name).to eq('GenericService')
- end
-
- it 'adds the specified service name' do
- c = Class.new do
- include GenericService
- self.service_name = 'test.service.TestService'
- end
- expect(c.service_name).to eq('test.service.TestService')
- end
-
- it 'adds the specified service name to subclasses' do
- base = Class.new do
- include GenericService
- self.service_name = 'test.service.TestService'
- end
- c = Class.new(base) do
- end
- expect(c.service_name).to eq('test.service.TestService')
- end
end
describe '#include' do
diff --git a/templates/Makefile.template b/templates/Makefile.template
index 44144c8f7e..bdb9765114 100644
--- a/templates/Makefile.template
+++ b/templates/Makefile.template
@@ -100,7 +100,7 @@ CPPFLAGS += -g -fPIC -Wall -Werror -Wno-long-long
LDFLAGS += -g -pthread -fPIC
INCLUDES = . include gens
-LIBS = rt m z pthread
+LIBS = rt m z event event_pthreads pthread
LIBSXX = protobuf
LIBS_PROTOC = protoc protobuf
@@ -159,12 +159,20 @@ else
IS_GIT_FOLDER = true
endif
+EVENT2_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/event2.c -levent $(LDFLAGS)
OPENSSL_ALPN_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/openssl-alpn.c -lssl -lcrypto -ldl $(LDFLAGS)
ZLIB_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/zlib.c -lz $(LDFLAGS)
+HAS_SYSTEM_EVENT2 = $(shell $(EVENT2_CHECK_CMD) 2> /dev/null && echo true || echo false)
HAS_SYSTEM_OPENSSL_ALPN = $(shell $(OPENSSL_ALPN_CHECK_CMD) 2> /dev/null && echo true || echo false)
HAS_SYSTEM_ZLIB = $(shell $(ZLIB_CHECK_CMD) 2> /dev/null && echo true || echo false)
+ifeq ($(wildcard third_party/libevent/include/event2/event.h),)
+HAS_EMBEDDED_EVENT2 = false
+else
+HAS_EMBEDDED_EVENT2 = true
+endif
+
ifeq ($(wildcard third_party/openssl/ssl/ssl.h),)
HAS_EMBEDDED_OPENSSL_ALPN = false
else
@@ -177,6 +185,12 @@ else
HAS_EMBEDDED_ZLIB = true
endif
+ifneq ($(SYSTEM),MINGW32)
+ifeq ($(HAS_SYSTEM_EVENT2),false)
+DEP_MISSING += libevent
+endif
+endif
+
ifeq ($(HAS_SYSTEM_ZLIB),false)
ifeq ($(HAS_EMBEDDED_ZLIB),true)
ZLIB_DEP = third_party/zlib/libz.a
@@ -277,6 +291,7 @@ ${tgt.name}: bins/$(CONFIG)/${tgt.name}
% endfor
run_dep_checks:
+ $(EVENT2_CHECK_CMD) || true
$(OPENSSL_ALPN_CHECK_CMD) || true
$(ZLIB_CHECK_CMD) || true
diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c
index e5b7304743..1181f1b4de 100644
--- a/test/core/end2end/cq_verifier.c
+++ b/test/core/end2end/cq_verifier.c
@@ -371,12 +371,7 @@ void cq_verify_empty(cq_verifier *v) {
GPR_ASSERT(v->expect.next == &v->expect && "expectation queue must be empty");
ev = grpc_completion_queue_next(v->cq, deadline);
- if (ev != NULL) {
- char *s = grpc_event_string(ev);
- gpr_log(GPR_ERROR, "unexpected event (expected nothing): %s", s);
- gpr_free(s);
- abort();
- }
+ GPR_ASSERT(ev == NULL);
}
static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) {
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c
index cb5c6f7cad..b3fac796f4 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair.c
@@ -120,7 +120,6 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
GPR_ASSERT(!f->server);
f->server =
grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args);
- grpc_server_start(f->server);
grpc_create_chttp2_transport(server_setup_transport, f, server_args,
sfd->server, NULL, 0, grpc_mdctx_create(), 0);
}
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
index 84acfa6d6c..061e049a09 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
@@ -120,7 +120,6 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
GPR_ASSERT(!f->server);
f->server =
grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args);
- grpc_server_start(f->server);
grpc_create_chttp2_transport(server_setup_transport, f, server_args,
sfd->server, NULL, 0, grpc_mdctx_create(), 0);
}
diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c
index bad86fb9dc..d15fef6adc 100644
--- a/test/core/end2end/tests/invoke_large_request.c
+++ b/test/core/end2end/tests/invoke_large_request.c
@@ -112,7 +112,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
gpr_slice request_payload_slice = large_slice();
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
- gpr_timespec deadline = n_seconds_time(30);
+ gpr_timespec deadline = n_seconds_time(10);
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c
index a418d1b15f..08198d49fb 100644
--- a/test/core/end2end/tests/max_concurrent_streams.c
+++ b/test/core/end2end/tests/max_concurrent_streams.c
@@ -156,12 +156,9 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
grpc_call *c2;
grpc_call *s1;
grpc_call *s2;
- int live_call;
- grpc_call *live_call_obj;
gpr_timespec deadline;
cq_verifier *v_client;
cq_verifier *v_server;
- grpc_event *ev;
server_arg.key = GRPC_ARG_MAX_CONCURRENT_STREAMS;
server_arg.type = GRPC_ARG_INTEGER;
@@ -183,10 +180,9 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
/* start two requests - ensuring that the second is not accepted until
the first completes */
deadline = five_seconds_time();
- c1 =
- grpc_channel_create_call(f.client, "/alpha", "test.google.com", deadline);
+ c1 = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c1);
- c2 = grpc_channel_create_call(f.client, "/beta", "test.google.com", deadline);
+ c2 = grpc_channel_create_call(f.client, "/bar", "test.google.com", deadline);
GPR_ASSERT(c1);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
@@ -195,29 +191,19 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
tag(301), tag(302), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c2, f.client_cq, tag(400),
tag(401), tag(402), 0));
- ev = grpc_completion_queue_next(
- f.client_cq, gpr_time_add(gpr_now(), gpr_time_from_seconds(10)));
- GPR_ASSERT(ev);
- GPR_ASSERT(ev->type == GRPC_INVOKE_ACCEPTED);
- GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK);
- /* The /alpha or /beta calls started above could be invoked (but NOT both);
- * check this here */
- live_call = (int)(gpr_intptr)ev->tag;
- live_call_obj = live_call == 300 ? c1 : c2;
- grpc_event_finish(ev);
+ cq_expect_invoke_accepted(v_client, tag(300), GRPC_OP_OK);
+ cq_verify(v_client);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_writes_done(live_call_obj, tag(live_call + 3)));
- cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK);
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c1, tag(303)));
+ cq_expect_finish_accepted(v_client, tag(303), GRPC_OP_OK);
cq_verify(v_client);
- cq_expect_server_rpc_new(v_server, &s1, tag(100),
- live_call == 300 ? "/alpha" : "/beta",
- "test.google.com", deadline, NULL);
+ cq_expect_server_rpc_new(v_server, &s1, tag(100), "/foo", "test.google.com",
+ deadline, NULL);
cq_verify(v_server);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s1, f.server_cq, tag(102), 0));
- cq_expect_client_metadata_read(v_client, tag(live_call + 1), NULL);
+ cq_expect_client_metadata_read(v_client, tag(301), NULL);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK ==
@@ -228,26 +214,22 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
cq_verify(v_server);
/* first request is finished, we should be able to start the second */
- cq_expect_finished_with_status(v_client, tag(live_call + 2),
- GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL);
- live_call = (live_call == 300) ? 400 : 300;
- live_call_obj = live_call == 300 ? c1 : c2;
- cq_expect_invoke_accepted(v_client, tag(live_call), GRPC_OP_OK);
+ cq_expect_finished_with_status(v_client, tag(302), GRPC_STATUS_UNIMPLEMENTED,
+ "xyz", NULL);
+ cq_expect_invoke_accepted(v_client, tag(400), GRPC_OP_OK);
cq_verify(v_client);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_writes_done(live_call_obj, tag(live_call + 3)));
- cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK);
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c2, tag(403)));
+ cq_expect_finish_accepted(v_client, tag(403), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(200)));
- cq_expect_server_rpc_new(v_server, &s2, tag(200),
- live_call == 300 ? "/alpha" : "/beta",
- "test.google.com", deadline, NULL);
+ cq_expect_server_rpc_new(v_server, &s2, tag(200), "/bar", "test.google.com",
+ deadline, NULL);
cq_verify(v_server);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s2, f.server_cq, tag(202), 0));
- cq_expect_client_metadata_read(v_client, tag(live_call + 1), NULL);
+ cq_expect_client_metadata_read(v_client, tag(401), NULL);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK ==
@@ -257,8 +239,8 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
cq_expect_finished(v_server, tag(202), NULL);
cq_verify(v_server);
- cq_expect_finished_with_status(v_client, tag(live_call + 2),
- GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL);
+ cq_expect_finished_with_status(v_client, tag(402), GRPC_STATUS_UNIMPLEMENTED,
+ "xyz", NULL);
cq_verify(v_client);
cq_verifier_destroy(v_client);
diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c
index 686d21d705..a8aa6126e6 100644
--- a/test/core/iomgr/alarm_list_test.c
+++ b/test/core/iomgr/alarm_list_test.c
@@ -41,13 +41,13 @@
#define MAX_CB 30
-static int cb_called[MAX_CB][2];
+static int cb_called[MAX_CB][GRPC_CALLBACK_DO_NOT_USE];
static int kicks;
void grpc_kick_poller() { ++kicks; }
-static void cb(void *arg, int success) {
- cb_called[(gpr_intptr)arg][success]++;
+static void cb(void *arg, grpc_iomgr_cb_status status) {
+ cb_called[(gpr_intptr)arg][status]++;
}
static void add_test() {
@@ -72,36 +72,36 @@ static void add_test() {
/* collect alarms. Only the first batch should be ready. */
GPR_ASSERT(10 ==
- grpc_alarm_check(
- NULL, gpr_time_add(start, gpr_time_from_millis(500)), NULL));
+ grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(500))));
for (i = 0; i < 20; i++) {
- GPR_ASSERT(cb_called[i][1] == (i < 10));
- GPR_ASSERT(cb_called[i][0] == 0);
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10));
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0);
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0);
}
GPR_ASSERT(0 ==
- grpc_alarm_check(
- NULL, gpr_time_add(start, gpr_time_from_millis(600)), NULL));
+ grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(600))));
for (i = 0; i < 30; i++) {
- GPR_ASSERT(cb_called[i][1] == (i < 10));
- GPR_ASSERT(cb_called[i][0] == 0);
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10));
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0);
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0);
}
/* collect the rest of the alarms */
GPR_ASSERT(10 ==
- grpc_alarm_check(
- NULL, gpr_time_add(start, gpr_time_from_millis(1500)), NULL));
+ grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1500))));
for (i = 0; i < 30; i++) {
- GPR_ASSERT(cb_called[i][1] == (i < 20));
- GPR_ASSERT(cb_called[i][0] == 0);
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20));
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0);
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0);
}
GPR_ASSERT(0 ==
- grpc_alarm_check(
- NULL, gpr_time_add(start, gpr_time_from_millis(1600)), NULL));
+ grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1600))));
for (i = 0; i < 30; i++) {
- GPR_ASSERT(cb_called[i][1] == (i < 20));
- GPR_ASSERT(cb_called[i][0] == 0);
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20));
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0);
+ GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0);
}
grpc_alarm_list_shutdown();
@@ -124,16 +124,16 @@ void destruction_test() {
(void *)(gpr_intptr)3, gpr_time_0);
grpc_alarm_init(&alarms[4], gpr_time_from_millis(1), cb,
(void *)(gpr_intptr)4, gpr_time_0);
- GPR_ASSERT(1 == grpc_alarm_check(NULL, gpr_time_from_millis(2), NULL));
- GPR_ASSERT(1 == cb_called[4][1]);
+ GPR_ASSERT(1 == grpc_alarm_check(gpr_time_from_millis(2)));
+ GPR_ASSERT(1 == cb_called[4][GRPC_CALLBACK_SUCCESS]);
grpc_alarm_cancel(&alarms[0]);
grpc_alarm_cancel(&alarms[3]);
- GPR_ASSERT(1 == cb_called[0][0]);
- GPR_ASSERT(1 == cb_called[3][0]);
+ GPR_ASSERT(1 == cb_called[0][GRPC_CALLBACK_CANCELLED]);
+ GPR_ASSERT(1 == cb_called[3][GRPC_CALLBACK_CANCELLED]);
grpc_alarm_list_shutdown();
- GPR_ASSERT(1 == cb_called[1][0]);
- GPR_ASSERT(1 == cb_called[2][0]);
+ GPR_ASSERT(1 == cb_called[1][GRPC_CALLBACK_CANCELLED]);
+ GPR_ASSERT(1 == cb_called[2][GRPC_CALLBACK_CANCELLED]);
}
int main(int argc, char **argv) {
diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c
index 247320de04..271c42d57e 100644
--- a/test/core/iomgr/alarm_test.c
+++ b/test/core/iomgr/alarm_test.c
@@ -51,10 +51,8 @@
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
-#define SUCCESS_NOT_SET (-1)
-
/* Dummy gRPC callback */
-void no_op_cb(void *arg, int success) {}
+void no_op_cb(void *arg, grpc_iomgr_cb_status status) {}
typedef struct {
gpr_cv cv;
@@ -64,25 +62,27 @@ typedef struct {
int done_cancel_ctr;
int done;
gpr_event fcb_arg;
- int success;
+ grpc_iomgr_cb_status status;
} alarm_arg;
-static void followup_cb(void *arg, int success) {
+static void followup_cb(void *arg, grpc_iomgr_cb_status status) {
gpr_event_set((gpr_event *)arg, arg);
}
/* Called when an alarm expires. */
-static void alarm_cb(void *arg /* alarm_arg */, int success) {
+static void alarm_cb(void *arg /* alarm_arg */, grpc_iomgr_cb_status status) {
alarm_arg *a = arg;
gpr_mu_lock(&a->mu);
- if (success) {
+ if (status == GRPC_CALLBACK_SUCCESS) {
a->counter++;
a->done_success_ctr++;
- } else {
+ } else if (status == GRPC_CALLBACK_CANCELLED) {
a->done_cancel_ctr++;
+ } else {
+ GPR_ASSERT(0);
}
a->done = 1;
- a->success = success;
+ a->status = status;
gpr_cv_signal(&a->cv);
gpr_mu_unlock(&a->mu);
grpc_iomgr_add_callback(followup_cb, &a->fcb_arg);
@@ -105,7 +105,7 @@ static void test_grpc_alarm() {
grpc_iomgr_init();
arg.counter = 0;
- arg.success = SUCCESS_NOT_SET;
+ arg.status = GRPC_CALLBACK_DO_NOT_USE;
arg.done_success_ctr = 0;
arg.done_cancel_ctr = 0;
arg.done = 0;
@@ -138,7 +138,7 @@ static void test_grpc_alarm() {
} else if (arg.done_cancel_ctr != 0) {
gpr_log(GPR_ERROR, "Alarm done callback called with cancel");
GPR_ASSERT(0);
- } else if (arg.success == SUCCESS_NOT_SET) {
+ } else if (arg.status == GRPC_CALLBACK_DO_NOT_USE) {
gpr_log(GPR_ERROR, "Alarm callback without status");
GPR_ASSERT(0);
} else {
@@ -154,7 +154,7 @@ static void test_grpc_alarm() {
gpr_mu_destroy(&arg.mu);
arg2.counter = 0;
- arg2.success = SUCCESS_NOT_SET;
+ arg2.status = GRPC_CALLBACK_DO_NOT_USE;
arg2.done_success_ctr = 0;
arg2.done_cancel_ctr = 0;
arg2.done = 0;
@@ -188,7 +188,7 @@ static void test_grpc_alarm() {
} else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) {
gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times");
GPR_ASSERT(0);
- } else if (arg2.success == SUCCESS_NOT_SET) {
+ } else if (arg2.status == GRPC_CALLBACK_DO_NOT_USE) {
gpr_log(GPR_ERROR, "Alarm callback without status");
GPR_ASSERT(0);
} else if (arg2.done_success_ctr) {
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 125cde4678..6a7f6afbc6 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -145,8 +145,8 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
gpr_cv_signal(&state->cv);
gpr_mu_unlock(&state->mu);
} else {
- grpc_endpoint_notify_on_read(state->read_ep,
- read_and_write_test_read_handler, data);
+ grpc_endpoint_notify_on_read(
+ state->read_ep, read_and_write_test_read_handler, data, gpr_inf_future);
}
}
@@ -159,8 +159,6 @@ static void read_and_write_test_write_handler(void *data,
GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
- gpr_log(GPR_DEBUG, "%s: error=%d", __FUNCTION__, error);
-
if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
gpr_log(GPR_INFO, "Write handler shutdown");
gpr_mu_lock(&state->mu);
@@ -184,10 +182,9 @@ static void read_and_write_test_write_handler(void *data,
slices = allocate_blocks(state->current_write_size, 8192, &nslices,
&state->current_write_data);
- write_status =
- grpc_endpoint_write(state->write_ep, slices, nslices,
- read_and_write_test_write_handler, state);
- gpr_log(GPR_DEBUG, "write_status=%d", write_status);
+ write_status = grpc_endpoint_write(state->write_ep, slices, nslices,
+ read_and_write_test_write_handler, state,
+ gpr_inf_future);
GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR);
free(slices);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
@@ -211,7 +208,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
size_t num_bytes, size_t write_size,
size_t slice_size, int shutdown) {
struct read_and_write_test_state state;
- gpr_timespec deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(20));
+ gpr_timespec rel_deadline = {20, 0};
+ gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline);
grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size);
if (shutdown) {
@@ -243,22 +241,16 @@ static void read_and_write_test(grpc_endpoint_test_config config,
read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK);
grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler,
- &state);
+ &state, gpr_inf_future);
if (shutdown) {
- gpr_log(GPR_DEBUG, "shutdown read");
grpc_endpoint_shutdown(state.read_ep);
- gpr_log(GPR_DEBUG, "shutdown write");
grpc_endpoint_shutdown(state.write_ep);
}
gpr_mu_lock(&state.mu);
while (!state.read_done || !state.write_done) {
- if (gpr_cv_wait(&state.cv, &state.mu, deadline)) {
- gpr_log(GPR_ERROR, "timeout: read_done=%d, write_done=%d",
- state.read_done, state.write_done);
- abort();
- }
+ GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0);
}
gpr_mu_unlock(&state.mu);
@@ -273,6 +265,79 @@ struct timeout_test_state {
gpr_event io_done;
};
+static void read_timeout_test_read_handler(void *data, gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_cb_status error) {
+ struct timeout_test_state *state = data;
+ GPR_ASSERT(error == GRPC_ENDPOINT_CB_TIMED_OUT);
+ gpr_event_set(&state->io_done, (void *)1);
+}
+
+static void read_timeout_test(grpc_endpoint_test_config config,
+ size_t slice_size) {
+ gpr_timespec timeout = gpr_time_from_micros(10000);
+ gpr_timespec read_deadline = gpr_time_add(gpr_now(), timeout);
+ gpr_timespec test_deadline =
+ gpr_time_add(gpr_now(), gpr_time_from_micros(2000000));
+ struct timeout_test_state state;
+ grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size);
+
+ gpr_event_init(&state.io_done);
+
+ grpc_endpoint_notify_on_read(f.client_ep, read_timeout_test_read_handler,
+ &state, read_deadline);
+ GPR_ASSERT(gpr_event_wait(&state.io_done, test_deadline));
+ grpc_endpoint_destroy(f.client_ep);
+ grpc_endpoint_destroy(f.server_ep);
+ end_test(config);
+}
+
+static void write_timeout_test_write_handler(void *data,
+ grpc_endpoint_cb_status error) {
+ struct timeout_test_state *state = data;
+ GPR_ASSERT(error == GRPC_ENDPOINT_CB_TIMED_OUT);
+ gpr_event_set(&state->io_done, (void *)1);
+}
+
+static void write_timeout_test(grpc_endpoint_test_config config,
+ size_t slice_size) {
+ gpr_timespec timeout = gpr_time_from_micros(10000);
+ gpr_timespec write_deadline = gpr_time_add(gpr_now(), timeout);
+ gpr_timespec test_deadline =
+ gpr_time_add(gpr_now(), gpr_time_from_micros(2000000));
+ struct timeout_test_state state;
+ int current_data = 1;
+ gpr_slice *slices;
+ size_t nblocks;
+ size_t size;
+ grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size);
+
+ gpr_event_init(&state.io_done);
+
+ /* TODO(klempner): Factor this out with the equivalent code in tcp_test.c */
+ for (size = 1;; size *= 2) {
+ slices = allocate_blocks(size, 1, &nblocks, &current_data);
+ switch (grpc_endpoint_write(f.client_ep, slices, nblocks,
+ write_timeout_test_write_handler, &state,
+ write_deadline)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
+ break;
+ case GRPC_ENDPOINT_WRITE_ERROR:
+ gpr_log(GPR_ERROR, "error writing");
+ abort();
+ case GRPC_ENDPOINT_WRITE_PENDING:
+ GPR_ASSERT(gpr_event_wait(&state.io_done, test_deadline));
+ gpr_free(slices);
+ goto exit;
+ }
+ gpr_free(slices);
+ }
+exit:
+ grpc_endpoint_destroy(f.client_ep);
+ grpc_endpoint_destroy(f.server_ep);
+ end_test(config);
+}
+
typedef struct {
gpr_event ev;
grpc_endpoint *ep;
@@ -292,8 +357,9 @@ static void shutdown_during_write_test_read_handler(
grpc_endpoint_destroy(st->ep);
gpr_event_set(&st->ev, (void *)(gpr_intptr)error);
} else {
- grpc_endpoint_notify_on_read(
- st->ep, shutdown_during_write_test_read_handler, user_data);
+ grpc_endpoint_notify_on_read(st->ep,
+ shutdown_during_write_test_read_handler,
+ user_data, gpr_inf_future);
}
}
@@ -331,13 +397,14 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
gpr_event_init(&read_st.ev);
gpr_event_init(&write_st.ev);
- grpc_endpoint_notify_on_read(
- read_st.ep, shutdown_during_write_test_read_handler, &read_st);
+ grpc_endpoint_notify_on_read(read_st.ep,
+ shutdown_during_write_test_read_handler,
+ &read_st, gpr_inf_future);
for (size = 1;; size *= 2) {
slices = allocate_blocks(size, 1, &nblocks, &current_data);
switch (grpc_endpoint_write(write_st.ep, slices, nblocks,
shutdown_during_write_test_write_handler,
- &write_st)) {
+ &write_st, gpr_inf_future)) {
case GRPC_ENDPOINT_WRITE_DONE:
break;
case GRPC_ENDPOINT_WRITE_ERROR:
@@ -365,5 +432,7 @@ void grpc_endpoint_tests(grpc_endpoint_test_config config) {
read_and_write_test(config, 10000000, 100000, 8192, 0);
read_and_write_test(config, 1000000, 100000, 1, 0);
read_and_write_test(config, 100000000, 100000, 1, 1);
+ read_timeout_test(config, 1000);
+ write_timeout_test(config, 1000);
shutdown_during_write_test(config, 1000);
}
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 325c9f0221..c3a0afdb25 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -31,7 +31,8 @@
*
*/
-#include "src/core/iomgr/fd_posix.h"
+/* Test gRPC event manager with a simple TCP upload server and client. */
+#include "src/core/iomgr/iomgr_libevent.h"
#include <ctype.h>
#include <errno.h>
@@ -84,7 +85,7 @@ static void create_test_socket(int port, int *socket_fd,
}
/* Dummy gRPC callback */
-void no_op_cb(void *arg, int success) {}
+void no_op_cb(void *arg, enum grpc_em_cb_status status) {}
/* =======An upload server to test notify_on_read===========
The server simply reads and counts a stream of bytes. */
@@ -116,10 +117,10 @@ typedef struct {
/* Called when an upload session can be safely shutdown.
Close session FD and start to shutdown listen FD. */
static void session_shutdown_cb(void *arg, /*session*/
- int success) {
+ enum grpc_em_cb_status status) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(se->em_fd, NULL, NULL);
+ grpc_fd_destroy(se->em_fd, NULL, NULL);
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(sv->em_fd);
@@ -127,15 +128,15 @@ static void session_shutdown_cb(void *arg, /*session*/
/* Called when data become readable in a session. */
static void session_read_cb(void *arg, /*session*/
- int success) {
+ enum grpc_em_cb_status status) {
session *se = arg;
- int fd = se->em_fd->fd;
+ int fd = grpc_fd_get(se->em_fd);
ssize_t read_once = 0;
ssize_t read_total = 0;
- if (!success) {
- session_shutdown_cb(arg, 1);
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ session_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
return;
}
@@ -150,7 +151,8 @@ static void session_read_cb(void *arg, /*session*/
It is possible to read nothing due to spurious edge event or data has
been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
if (read_once == 0) {
- session_shutdown_cb(arg, 1);
+ grpc_fd_shutdown(se->em_fd);
+ grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future);
} else if (read_once == -1) {
if (errno == EAGAIN) {
/* An edge triggered event is cached in the kernel until next poll.
@@ -161,7 +163,8 @@ static void session_read_cb(void *arg, /*session*/
TODO(chenw): in multi-threaded version, callback and polling can be
run in different threads. polling may catch a persist read edge event
before notify_on_read is called. */
- grpc_fd_notify_on_read(se->em_fd, session_read_cb, se);
+ GPR_ASSERT(grpc_fd_notify_on_read(se->em_fd, session_read_cb, se,
+ gpr_inf_future));
} else {
gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
GPR_ASSERT(0);
@@ -171,10 +174,11 @@ static void session_read_cb(void *arg, /*session*/
/* Called when the listen FD can be safely shutdown.
Close listen FD and signal that server can be shutdown. */
-static void listen_shutdown_cb(void *arg /*server*/, int success) {
+static void listen_shutdown_cb(void *arg /*server*/,
+ enum grpc_em_cb_status status) {
server *sv = arg;
- grpc_fd_orphan(sv->em_fd, NULL, NULL);
+ grpc_fd_destroy(sv->em_fd, NULL, NULL);
gpr_mu_lock(&sv->mu);
sv->done = 1;
@@ -184,21 +188,21 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) {
/* Called when a new TCP connection request arrives in the listening port. */
static void listen_cb(void *arg, /*=sv_arg*/
- int success) {
+ enum grpc_em_cb_status status) {
server *sv = arg;
int fd;
int flags;
session *se;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
- grpc_fd *listen_em_fd = sv->em_fd;
+ struct grpc_fd *listen_em_fd = sv->em_fd;
- if (!success) {
- listen_shutdown_cb(arg, 1);
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ listen_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
return;
}
- fd = accept(listen_em_fd->fd, (struct sockaddr *)&ss, &slen);
+ fd = accept(grpc_fd_get(listen_em_fd), (struct sockaddr *)&ss, &slen);
GPR_ASSERT(fd >= 0);
GPR_ASSERT(fd < FD_SETSIZE);
flags = fcntl(fd, F_GETFL, 0);
@@ -206,9 +210,11 @@ static void listen_cb(void *arg, /*=sv_arg*/
se = gpr_malloc(sizeof(*se));
se->sv = sv;
se->em_fd = grpc_fd_create(fd);
- grpc_fd_notify_on_read(se->em_fd, session_read_cb, se);
+ GPR_ASSERT(
+ grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future));
- grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv);
+ GPR_ASSERT(
+ grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv, gpr_inf_future));
}
/* Max number of connections pending to be accepted by listen(). */
@@ -233,7 +239,7 @@ static int server_start(server *sv) {
sv->em_fd = grpc_fd_create(fd);
/* Register to be interested in reading from listen_fd. */
- grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv);
+ GPR_ASSERT(grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv, gpr_inf_future));
return port;
}
@@ -279,24 +285,25 @@ static void client_init(client *cl) {
}
/* Called when a client upload session is ready to shutdown. */
-static void client_session_shutdown_cb(void *arg /*client*/, int success) {
+static void client_session_shutdown_cb(void *arg /*client*/,
+ enum grpc_em_cb_status status) {
client *cl = arg;
- grpc_fd_orphan(cl->em_fd, NULL, NULL);
+ grpc_fd_destroy(cl->em_fd, NULL, NULL);
+ gpr_mu_lock(&cl->mu);
cl->done = 1;
gpr_cv_signal(&cl->done_cv);
+ gpr_mu_unlock(&cl->mu);
}
/* Write as much as possible, then register notify_on_write. */
static void client_session_write(void *arg, /*client*/
- int success) {
+ enum grpc_em_cb_status status) {
client *cl = arg;
- int fd = cl->em_fd->fd;
+ int fd = grpc_fd_get(cl->em_fd);
ssize_t write_once = 0;
- if (!success) {
- gpr_mu_lock(&cl->mu);
- client_session_shutdown_cb(arg, 1);
- gpr_mu_unlock(&cl->mu);
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ client_session_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
return;
}
@@ -308,10 +315,14 @@ static void client_session_write(void *arg, /*client*/
if (errno == EAGAIN) {
gpr_mu_lock(&cl->mu);
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
- grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl);
+ GPR_ASSERT(grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl,
+ gpr_inf_future));
cl->client_write_cnt++;
} else {
- client_session_shutdown_cb(arg, 1);
+ close(fd);
+ grpc_fd_shutdown(cl->em_fd);
+ grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl,
+ gpr_inf_future);
}
gpr_mu_unlock(&cl->mu);
} else {
@@ -333,7 +344,7 @@ static void client_start(client *cl, int port) {
cl->em_fd = grpc_fd_create(fd);
- client_session_write(cl, 1);
+ client_session_write(cl, GRPC_CALLBACK_SUCCESS);
}
/* Wait for the signal to shutdown a client. */
@@ -367,7 +378,7 @@ static void test_grpc_fd() {
typedef struct fd_change_data {
gpr_mu mu;
gpr_cv cv;
- void (*cb_that_ran)(void *, int success);
+ void (*cb_that_ran)(void *, enum grpc_em_cb_status);
} fd_change_data;
void init_change_data(fd_change_data *fdc) {
@@ -381,7 +392,8 @@ void destroy_change_data(fd_change_data *fdc) {
gpr_cv_destroy(&fdc->cv);
}
-static void first_read_callback(void *arg /* fd_change_data */, int success) {
+static void first_read_callback(void *arg /* fd_change_data */,
+ enum grpc_em_cb_status status) {
fd_change_data *fdc = arg;
gpr_mu_lock(&fdc->mu);
@@ -390,7 +402,8 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) {
gpr_mu_unlock(&fdc->mu);
}
-static void second_read_callback(void *arg /* fd_change_data */, int success) {
+static void second_read_callback(void *arg /* fd_change_data */,
+ enum grpc_em_cb_status status) {
fd_change_data *fdc = arg;
gpr_mu_lock(&fdc->mu);
@@ -423,7 +436,7 @@ static void test_grpc_fd_change() {
em_fd = grpc_fd_create(sv[0]);
/* Register the first callback, then make its FD readable */
- grpc_fd_notify_on_read(em_fd, first_read_callback, &a);
+ grpc_fd_notify_on_read(em_fd, first_read_callback, &a, gpr_inf_future);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -442,7 +455,7 @@ static void test_grpc_fd_change() {
/* Now register a second callback with distinct change data, and do the same
thing again. */
- grpc_fd_notify_on_read(em_fd, second_read_callback, &b);
+ grpc_fd_notify_on_read(em_fd, second_read_callback, &b, gpr_inf_future);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -455,9 +468,48 @@ static void test_grpc_fd_change() {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(&b.mu);
- grpc_fd_orphan(em_fd, NULL, NULL);
+ grpc_fd_destroy(em_fd, NULL, NULL);
destroy_change_data(&a);
destroy_change_data(&b);
+ close(sv[0]);
+ close(sv[1]);
+}
+
+void timeout_callback(void *arg, enum grpc_em_cb_status status) {
+ if (status == GRPC_CALLBACK_TIMED_OUT) {
+ gpr_event_set(arg, (void *)1);
+ } else {
+ gpr_event_set(arg, (void *)2);
+ }
+}
+
+void test_grpc_fd_notify_timeout() {
+ grpc_fd *em_fd;
+ gpr_event ev;
+ int flags;
+ int sv[2];
+ gpr_timespec timeout;
+ gpr_timespec deadline;
+
+ gpr_event_init(&ev);
+
+ GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
+ flags = fcntl(sv[0], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
+ flags = fcntl(sv[1], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+
+ em_fd = grpc_fd_create(sv[0]);
+
+ timeout = gpr_time_from_micros(1000000);
+ deadline = gpr_time_add(gpr_now(), timeout);
+
+ grpc_fd_notify_on_read(em_fd, timeout_callback, &ev, deadline);
+
+ GPR_ASSERT(gpr_event_wait(&ev, gpr_time_add(deadline, timeout)));
+
+ GPR_ASSERT(gpr_event_get(&ev) == (void *)1);
+ grpc_fd_destroy(em_fd, NULL, NULL);
close(sv[1]);
}
@@ -466,6 +518,7 @@ int main(int argc, char **argv) {
grpc_iomgr_init();
test_grpc_fd();
test_grpc_fd_change();
+ test_grpc_fd_notify_timeout();
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c
index 2d0a89a1f5..cb1cd0bc16 100644
--- a/test/core/iomgr/tcp_client_posix_test.c
+++ b/test/core/iomgr/tcp_client_posix_test.c
@@ -44,7 +44,7 @@
#include <grpc/support/time.h>
static gpr_timespec test_deadline() {
- return gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
+ return gpr_time_add(gpr_now(), gpr_time_from_micros(1000000));
}
static void must_succeed(void *arg, grpc_endpoint *tcp) {
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 7fd2567cec..6a4ef0f984 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -154,7 +154,7 @@ static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
if (state->read_bytes >= state->target_read_bytes) {
gpr_cv_signal(&state->cv);
} else {
- grpc_endpoint_notify_on_read(state->ep, read_cb, state);
+ grpc_endpoint_notify_on_read(state->ep, read_cb, state, gpr_inf_future);
}
gpr_mu_unlock(&state->mu);
}
@@ -183,7 +183,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- grpc_endpoint_notify_on_read(ep, read_cb, &state);
+ grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future);
gpr_mu_lock(&state.mu);
for (;;) {
@@ -225,7 +225,7 @@ static void large_read_test(ssize_t slice_size) {
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- grpc_endpoint_notify_on_read(ep, read_cb, &state);
+ grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future);
gpr_mu_lock(&state.mu);
for (;;) {
@@ -363,8 +363,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
- if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) ==
- GRPC_ENDPOINT_WRITE_DONE) {
+ if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state,
+ gpr_inf_future) == GRPC_ENDPOINT_WRITE_DONE) {
/* Write completed immediately */
read_bytes = drain_socket(sv[0]);
GPR_ASSERT(read_bytes == num_bytes);
@@ -421,13 +421,15 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
- switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) {
+ switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state,
+ gpr_inf_future)) {
case GRPC_ENDPOINT_WRITE_DONE:
case GRPC_ENDPOINT_WRITE_ERROR:
/* Write completed immediately */
break;
case GRPC_ENDPOINT_WRITE_PENDING:
- grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL);
+ grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL,
+ gpr_inf_future);
gpr_mu_lock(&state.mu);
for (;;) {
if (state.write_done) {
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index f30ff917cb..cb77a88062 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -66,7 +66,7 @@ static void test_no_op() {
static void test_no_op_with_start() {
grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST();
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, on_connect, NULL);
grpc_tcp_server_destroy(s);
}
@@ -93,7 +93,7 @@ static void test_no_op_with_port_and_start() {
GPR_ASSERT(
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)));
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, on_connect, NULL);
grpc_tcp_server_destroy(s);
}
@@ -120,7 +120,7 @@ static void test_connect(int n) {
GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0);
GPR_ASSERT(addr_len <= sizeof(addr));
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, on_connect, NULL);
for (i = 0; i < n; i++) {
deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000));
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index d4baa64725..9311d6ba11 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -153,7 +153,8 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
int verified = 0;
gpr_log(GPR_INFO, "Start test left over");
- grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified);
+ grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified,
+ gpr_inf_future);
GPR_ASSERT(verified == 1);
grpc_endpoint_shutdown(f.client_ep);
@@ -186,7 +187,7 @@ static void test_destroy_ep_early(grpc_endpoint_test_config config,
grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
gpr_log(GPR_INFO, "Start test destroy early");
- grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f);
+ grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f, gpr_inf_future);
grpc_endpoint_shutdown(f.server_ep);
grpc_endpoint_destroy(f.server_ep);
diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj
index 6cc5a8b293..ffce5f6ecd 100644
--- a/vsprojects/vs2013/grpc.vcxproj
+++ b/vsprojects/vs2013/grpc.vcxproj
@@ -115,12 +115,10 @@
<ClInclude Include="..\..\src\core\iomgr\alarm_internal.h" />
<ClInclude Include="..\..\src\core\iomgr\endpoint.h" />
<ClInclude Include="..\..\src\core\iomgr\endpoint_pair.h" />
- <ClInclude Include="..\..\src\core\iomgr\fd_posix.h" />
+ <ClInclude Include="..\..\src\core\iomgr\iomgr_completion_queue_interface.h" />
<ClInclude Include="..\..\src\core\iomgr\iomgr.h" />
- <ClInclude Include="..\..\src\core\iomgr\iomgr_internal.h" />
- <ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" />
+ <ClInclude Include="..\..\src\core\iomgr\iomgr_libevent.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset.h" />
- <ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\resolve_address.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr_posix.h" />
@@ -145,9 +143,9 @@
<ClInclude Include="..\..\src\core\surface\server.h" />
<ClInclude Include="..\..\src\core\surface\surface_trace.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h" />
- <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_goaway.h" />
+ <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_ping.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_rst_stream.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_settings.h" />
@@ -160,8 +158,8 @@
<ClInclude Include="..\..\src\core\transport\chttp2\stream_encoder.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\stream_map.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\timeout_encoding.h" />
- <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" />
<ClInclude Include="..\..\src\core\transport\chttp2_transport.h" />
+ <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" />
<ClInclude Include="..\..\src\core\transport\metadata.h" />
<ClInclude Include="..\..\src\core\transport\stream_op.h" />
<ClInclude Include="..\..\src\core\transport\transport.h" />
@@ -238,15 +236,11 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\endpoint_pair_posix.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\fd_posix.c">
- </ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\iomgr.c">
+ <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c">
+ <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent_use_threads.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
- </ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\pollset_posix.c">
+ <ClCompile Include="..\..\src\core\iomgr\pollset.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\resolve_address_posix.c">
</ClCompile>
@@ -338,10 +332,10 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\timeout_encoding.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\transport\chttp2\varint.c">
- </ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2_transport.c">
</ClCompile>
+ <ClCompile Include="..\..\src\core\transport\chttp2\varint.c">
+ </ClCompile>
<ClCompile Include="..\..\src\core\transport\metadata.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\stream_op.c">
diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj
index 6cc5a8b293..ffce5f6ecd 100644
--- a/vsprojects/vs2013/grpc_unsecure.vcxproj
+++ b/vsprojects/vs2013/grpc_unsecure.vcxproj
@@ -115,12 +115,10 @@
<ClInclude Include="..\..\src\core\iomgr\alarm_internal.h" />
<ClInclude Include="..\..\src\core\iomgr\endpoint.h" />
<ClInclude Include="..\..\src\core\iomgr\endpoint_pair.h" />
- <ClInclude Include="..\..\src\core\iomgr\fd_posix.h" />
+ <ClInclude Include="..\..\src\core\iomgr\iomgr_completion_queue_interface.h" />
<ClInclude Include="..\..\src\core\iomgr\iomgr.h" />
- <ClInclude Include="..\..\src\core\iomgr\iomgr_internal.h" />
- <ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" />
+ <ClInclude Include="..\..\src\core\iomgr\iomgr_libevent.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset.h" />
- <ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\resolve_address.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr_posix.h" />
@@ -145,9 +143,9 @@
<ClInclude Include="..\..\src\core\surface\server.h" />
<ClInclude Include="..\..\src\core\surface\surface_trace.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h" />
- <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_goaway.h" />
+ <ClInclude Include="..\..\src\core\transport\chttp2\frame.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_ping.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_rst_stream.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_settings.h" />
@@ -160,8 +158,8 @@
<ClInclude Include="..\..\src\core\transport\chttp2\stream_encoder.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\stream_map.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\timeout_encoding.h" />
- <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" />
<ClInclude Include="..\..\src\core\transport\chttp2_transport.h" />
+ <ClInclude Include="..\..\src\core\transport\chttp2\varint.h" />
<ClInclude Include="..\..\src\core\transport\metadata.h" />
<ClInclude Include="..\..\src\core\transport\stream_op.h" />
<ClInclude Include="..\..\src\core\transport\transport.h" />
@@ -238,15 +236,11 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\endpoint_pair_posix.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\fd_posix.c">
- </ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\iomgr.c">
+ <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c">
+ <ClCompile Include="..\..\src\core\iomgr\iomgr_libevent_use_threads.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
- </ClCompile>
- <ClCompile Include="..\..\src\core\iomgr\pollset_posix.c">
+ <ClCompile Include="..\..\src\core\iomgr\pollset.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\resolve_address_posix.c">
</ClCompile>
@@ -338,10 +332,10 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\timeout_encoding.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\transport\chttp2\varint.c">
- </ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2_transport.c">
</ClCompile>
+ <ClCompile Include="..\..\src\core\transport\chttp2\varint.c">
+ </ClCompile>
<ClCompile Include="..\..\src\core\transport\metadata.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\stream_op.c">