aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/iomgr/iocp_windows.c20
-rw-r--r--src/core/iomgr/iocp_windows.h1
-rw-r--r--src/core/iomgr/socket_windows.c41
-rw-r--r--src/core/iomgr/socket_windows.h16
-rw-r--r--src/core/iomgr/tcp_client_windows.c9
-rw-r--r--src/core/iomgr/tcp_server_windows.c6
-rw-r--r--src/core/iomgr/tcp_windows.c114
-rw-r--r--src/core/transport/chttp2/internal.h3
-rw-r--r--src/core/transport/chttp2_transport.c32
-rw-r--r--test/core/iomgr/endpoint_pair_test.c7
-rw-r--r--test/core/iomgr/endpoint_tests.c111
11 files changed, 100 insertions, 260 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 09a457dd9a..0799622497 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -52,7 +52,6 @@ static OVERLAPPED g_iocp_custom_overlap;
static gpr_event g_shutdown_iocp;
static gpr_event g_iocp_done;
-static gpr_atm g_orphans = 0;
static gpr_atm g_custom_events = 0;
static HANDLE g_iocp;
@@ -92,22 +91,13 @@ static void do_iocp_work() {
gpr_log(GPR_ERROR, "Unknown IOCP operation");
abort();
}
- GPR_ASSERT(info->outstanding);
- if (socket->orphan) {
- info->outstanding = 0;
- if (!socket->read_info.outstanding && !socket->write_info.outstanding) {
- grpc_winsocket_destroy(socket);
- gpr_atm_full_fetch_add(&g_orphans, -1);
- }
- return;
- }
success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
FALSE, &flags);
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
- gpr_mu_lock(&socket->state_mu);
GPR_ASSERT(!info->has_pending_iocp);
+ gpr_mu_lock(&socket->state_mu);
if (info->cb) {
f = info->cb;
opaque = info->opaque;
@@ -120,7 +110,7 @@ static void do_iocp_work() {
}
static void iocp_loop(void *p) {
- while (gpr_atm_acq_load(&g_orphans) || gpr_atm_acq_load(&g_custom_events) ||
+ while (gpr_atm_acq_load(&g_custom_events) ||
!gpr_event_get(&g_shutdown_iocp)) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
@@ -175,12 +165,6 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
GPR_ASSERT(ret == g_iocp);
}
-void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
- GPR_ASSERT(!socket->orphan);
- gpr_atm_full_fetch_add(&g_orphans, 1);
- socket->orphan = 1;
-}
-
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.
diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h
index ee3847a229..7d2dc45176 100644
--- a/src/core/iomgr/iocp_windows.h
+++ b/src/core/iomgr/iocp_windows.h
@@ -42,7 +42,6 @@ void grpc_iocp_init(void);
void grpc_iocp_kick(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
-void grpc_iocp_socket_orphan(grpc_winsocket *);
void grpc_socket_notify_on_write(grpc_winsocket *,
void (*cb)(void *, int success), void *opaque);
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 7d8421376b..dbc64637b5 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -62,46 +62,13 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
mutex hold states, and that'd be unsafe to call them directly. */
-int grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
- int callbacks_set = 0;
- SOCKET socket;
- gpr_mu_lock(&winsocket->state_mu);
- socket = winsocket->socket;
- if (winsocket->read_info.cb) {
- callbacks_set++;
- grpc_iomgr_closure_init(&winsocket->shutdown_closure,
- winsocket->read_info.cb,
- winsocket->read_info.opaque);
- grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
- }
- if (winsocket->write_info.cb) {
- callbacks_set++;
- grpc_iomgr_closure_init(&winsocket->shutdown_closure,
- winsocket->write_info.cb,
- winsocket->write_info.opaque);
- grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
- }
- gpr_mu_unlock(&winsocket->state_mu);
- closesocket(socket);
- return callbacks_set;
-}
-
-/* Abandons a socket. Either we're going to queue it up for garbage collecting
- from the IO Completion Port thread, or destroy it immediately. Note that this
- mechanisms assumes that we're either always waiting for an operation, or we
- explicitly know that we don't. If there is a future case where we can have
- an "idle" socket which is neither trying to read or write, we'd start leaking
- both memory and sockets. */
-void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
- grpc_iomgr_unregister_object(&winsocket->iomgr_object);
- if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
- grpc_iocp_socket_orphan(winsocket);
- } else {
- grpc_winsocket_destroy(winsocket);
- }
+void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
+ shutdown(winsocket->socket, SD_BOTH);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
+ closesocket(winsocket->socket);
+ grpc_iomgr_unregister_object(&winsocket->iomgr_object);
gpr_mu_destroy(&winsocket->state_mu);
gpr_free(winsocket);
}
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index ecf2530173..498921e0fd 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -68,8 +68,6 @@ typedef struct grpc_winsocket_callback_info {
/* The results of the overlapped operation. */
DWORD bytes_transfered;
int wsa_error;
- /* A boolean indicating that we started an operation. */
- int outstanding;
} grpc_winsocket_callback_info;
/* This is a wrapper to a Windows socket. A socket can have one outstanding
@@ -92,10 +90,6 @@ typedef struct grpc_winsocket {
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
int added_to_iocp;
- /* A boolean to indicate that the caller has abandoned that socket, but
- there is a pending operation that the IO Completion Port will have to
- wait for. The socket will be collected at that time. */
- int orphan;
grpc_iomgr_closure shutdown_closure;
@@ -108,14 +102,10 @@ typedef struct grpc_winsocket {
grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name);
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
- operation to cancel them. Returns the number of callbacks that got setup. */
-int grpc_winsocket_shutdown(grpc_winsocket *socket);
+ operation to cancel them. */
+void grpc_winsocket_shutdown(grpc_winsocket *socket);
-/* Abandon a socket. */
-void grpc_winsocket_orphan(grpc_winsocket *socket);
-
-/* Destroy a socket. Should only be called by the IO Completion Port thread,
- or by grpc_winsocket_orphan if there's no pending operation. */
+/* Destroy a socket. Should only be called if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 79a58fe2af..665ef2885f 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -102,7 +102,6 @@ static void on_connect(void *acp, int from_iocp) {
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
- info->outstanding = 0;
GPR_ASSERT(transfered_bytes == 0);
if (!wsa_success) {
char *utf8_message = gpr_format_message(WSAGetLastError());
@@ -125,12 +124,10 @@ static void on_connect(void *acp, int from_iocp) {
return;
}
- ac->socket->write_info.outstanding = 0;
-
/* If we don't have an endpoint, it means the connection failed,
so it doesn't matter if it aborted or failed. We need to orphan
that socket. */
- if (!ep || aborted) grpc_winsocket_orphan(ac->socket);
+ if (!ep || aborted) grpc_winsocket_destroy(ac->socket);
async_connect_cleanup(ac);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
@@ -196,7 +193,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
socket = grpc_winsocket_create(sock, "client");
info = &socket->write_info;
- info->outstanding = 1;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
@@ -220,7 +216,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
gpr_now(GPR_CLOCK_MONOTONIC));
- socket->write_info.outstanding = 1;
grpc_socket_notify_on_write(socket, on_connect, ac);
return;
@@ -229,7 +224,7 @@ failure:
gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message);
if (socket) {
- grpc_winsocket_orphan(socket);
+ grpc_winsocket_destroy(socket);
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index d0478d3604..79ea223d54 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -116,7 +116,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
sp->shutting_down = 1;
- s->iomgr_callbacks_pending += grpc_winsocket_shutdown(sp->socket);
+ grpc_winsocket_shutdown(sp->socket);
}
/* This happens asynchronously. Wait while that happens. */
while (s->active_ports || s->iomgr_callbacks_pending) {
@@ -129,7 +129,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
closed by the system. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
- grpc_winsocket_orphan(sp->socket);
+ grpc_winsocket_destroy(sp->socket);
}
gpr_free(s->ports);
gpr_free(s);
@@ -189,7 +189,6 @@ error:
static void decrement_active_ports_and_notify(server_port *sp) {
sp->shutting_down = 0;
- sp->socket->read_info.outstanding = 0;
gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) {
@@ -462,7 +461,6 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
- s->ports[i].socket->read_info.outstanding = 1;
start_accept(s->ports + i);
s->active_ports++;
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 58f9160ef9..fe3673c607 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -97,7 +97,7 @@ typedef struct grpc_tcp {
} grpc_tcp;
static void tcp_free(grpc_tcp *tcp) {
- grpc_winsocket_orphan(tcp->socket);
+ grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
gpr_free(tcp);
@@ -135,55 +135,35 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */
-static int on_read(grpc_tcp *tcp, int from_iocp) {
+static int on_read(grpc_tcp *tcp, int success) {
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
gpr_slice *slice = NULL;
size_t nslices = 0;
- int success;
grpc_winsocket_callback_info *info = &socket->read_info;
int do_abort = 0;
- gpr_mu_lock(&tcp->mu);
- if (!from_iocp || tcp->shutting_down) {
- /* If we are here with from_iocp set to true, it means we got raced to
- shutting down the endpoint. No actual abort callback will happen
- though, so we're going to do it from here. */
- do_abort = 1;
- }
- gpr_mu_unlock(&tcp->mu);
-
- if (do_abort) {
- if (from_iocp) {
- tcp->socket->read_info.outstanding = 0;
+ if (success) {
+ if (socket->read_info.wsa_error != 0) {
+ if (socket->read_info.wsa_error != WSAECONNRESET) {
+ char *utf8_message = gpr_format_message(info->wsa_error);
+ gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
+ gpr_free(utf8_message);
+ }
+ success = 0;
gpr_slice_unref(tcp->read_slice);
- }
- return 0;
- }
-
- GPR_ASSERT(tcp->socket->read_info.outstanding);
-
- if (socket->read_info.wsa_error != 0) {
- if (socket->read_info.wsa_error != WSAECONNRESET) {
- char *utf8_message = gpr_format_message(info->wsa_error);
- gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
- gpr_free(utf8_message);
- }
- success = 0;
- gpr_slice_unref(tcp->read_slice);
- } else {
- if (info->bytes_transfered != 0) {
- sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
- gpr_slice_buffer_add(tcp->read_slices, sub);
- success = 1;
} else {
- gpr_slice_unref(tcp->read_slice);
- success = 0;
+ if (info->bytes_transfered != 0) {
+ sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
+ gpr_slice_buffer_add(tcp->read_slices, sub);
+ success = 1;
+ } else {
+ gpr_slice_unref(tcp->read_slice);
+ success = 0;
+ }
}
}
- tcp->socket->read_info.outstanding = 0;
-
return success;
}
@@ -209,14 +189,10 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
DWORD flags = 0;
WSABUF buffer;
- GPR_ASSERT(!tcp->socket->read_info.outstanding);
if (tcp->shutting_down) {
return GRPC_ENDPOINT_ERROR;
}
- TCP_REF(tcp, "read");
-
- tcp->socket->read_info.outstanding = 1;
tcp->read_cb = cb;
tcp->read_slices = read_slices;
gpr_slice_buffer_reset_and_unref(read_slices);
@@ -236,10 +212,11 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
int ok;
info->bytes_transfered = bytes_read;
ok = on_read(tcp, 1);
- TCP_UNREF(tcp, "read");
return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
}
+ TCP_REF(tcp, "read");
+
/* Otherwise, let's retry, by queuing a read. */
memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
@@ -260,52 +237,31 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
}
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_write(void *tcpp, int from_iocp) {
+static void on_write(void *tcpp, int success) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_iomgr_closure *cb;
- int success;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
cb = tcp->write_cb;
tcp->write_cb = NULL;
- if (!from_iocp || tcp->shutting_down) {
- /* If we are here with from_iocp set to true, it means we got raced to
- shutting down the endpoint. No actual abort callback will happen
- though, so we're going to do it from here. */
- do_abort = 1;
- }
gpr_mu_unlock(&tcp->mu);
- if (do_abort) {
- if (from_iocp) {
- tcp->socket->write_info.outstanding = 0;
- }
- TCP_UNREF(tcp, "write");
- if (cb) {
- cb->cb(cb->cb_arg, 0);
- }
- return;
- }
-
- GPR_ASSERT(tcp->socket->write_info.outstanding);
-
- if (info->wsa_error != 0) {
- if (info->wsa_error != WSAECONNRESET) {
- char *utf8_message = gpr_format_message(info->wsa_error);
- gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
- gpr_free(utf8_message);
+ if (success) {
+ if (info->wsa_error != 0) {
+ if (info->wsa_error != WSAECONNRESET) {
+ char *utf8_message = gpr_format_message(info->wsa_error);
+ gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
+ gpr_free(utf8_message);
+ }
+ success = 0;
+ } else {
+ GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
}
- success = 0;
- } else {
- GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
- success = 1;
}
- tcp->socket->write_info.outstanding = 0;
-
TCP_UNREF(tcp, "write");
cb->cb(cb->cb_arg, success);
}
@@ -324,13 +280,10 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
WSABUF *allocated = NULL;
WSABUF *buffers = local_buffers;
- GPR_ASSERT(!tcp->socket->write_info.outstanding);
if (tcp->shutting_down) {
return GRPC_ENDPOINT_ERROR;
}
- TCP_REF(tcp, "write");
- tcp->socket->write_info.outstanding = 1;
tcp->write_cb = cb;
tcp->write_slices = slices;
@@ -365,11 +318,11 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
}
}
if (allocated) gpr_free(allocated);
- tcp->socket->write_info.outstanding = 0;
- TCP_UNREF(tcp, "write");
return ret;
}
+ TCP_REF(tcp, "write");
+
/* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
@@ -380,7 +333,6 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
- tcp->socket->write_info.outstanding = 0;
TCP_UNREF(tcp, "write");
return GRPC_ENDPOINT_ERROR;
}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index a1b773b1ca..7a42de9245 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -293,6 +293,9 @@ struct grpc_chttp2_transport {
gpr_refcount refs;
char *peer_string;
+ /** when this drops to zero it's safe to shutdown the endpoint */
+ gpr_refcount shutdown_ep_refs;
+
gpr_mu mu;
/** is the transport destroying itself? */
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 8caa10c938..aa7a7c9471 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -222,6 +222,8 @@ static void init_transport(grpc_chttp2_transport *t,
t->ep = ep;
/* one ref is for destroy, the other for when ep becomes NULL */
gpr_ref_init(&t->refs, 2);
+ /* ref is dropped at transport close() */
+ gpr_ref_init(&t->shutdown_ep_refs, 1);
gpr_mu_init(&t->mu);
grpc_mdctx_ref(mdctx);
t->peer_string = grpc_endpoint_get_peer(ep);
@@ -336,13 +338,26 @@ static void destroy_transport(grpc_transport *gt) {
UNREF_TRANSPORT(t, "destroy");
}
+/** block grpc_endpoint_shutdown being called until a paired
+ allow_endpoint_shutdown is made */
+static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
+ GPR_ASSERT(t->shutdown_ep_refs.count);
+ gpr_ref(&t->shutdown_ep_refs);
+}
+
+static void allow_endpoint_shutdown(grpc_chttp2_transport *t) {
+ if (gpr_unref(&t->shutdown_ep_refs)) {
+ grpc_endpoint_shutdown(t->ep);
+ }
+}
+
static void close_transport_locked(grpc_chttp2_transport *t) {
if (!t->closed) {
t->closed = 1;
connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE,
"close_transport");
if (t->ep) {
- grpc_endpoint_shutdown(t->ep);
+ allow_endpoint_shutdown(t);
}
}
}
@@ -471,6 +486,7 @@ static void unlock(grpc_chttp2_transport *t) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
+ prevent_endpoint_shutdown(t);
}
run_closures = t->global.pending_closures_head;
@@ -536,6 +552,7 @@ void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
static void writing_action(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
grpc_chttp2_perform_writes(&t->writing, t->ep);
+ allow_endpoint_shutdown(t);
}
void grpc_chttp2_add_incoming_goaway(
@@ -1104,21 +1121,28 @@ static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
read_error_locked(t);
} else {
keep_reading = 1;
+ prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
unlock(t);
if (keep_reading) {
+ int ret = -1;
switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
case GRPC_ENDPOINT_DONE:
*success = 1;
- return 1;
+ ret = 1;
+ break;
case GRPC_ENDPOINT_ERROR:
*success = 0;
- return 1;
+ ret = 1;
+ break;
case GRPC_ENDPOINT_PENDING:
- return 0;
+ ret = 0;
+ break;
}
+ allow_endpoint_shutdown(t);
+ return ret;
} else {
UNREF_TRANSPORT(t, "recv_data");
return 0;
diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c
index 276fe758d9..3abde5ac35 100644
--- a/test/core/iomgr/endpoint_pair_test.c
+++ b/test/core/iomgr/endpoint_pair_test.c
@@ -33,13 +33,6 @@
#include "src/core/iomgr/tcp_posix.h"
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index ef673747a1..148aa9e4ca 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -39,6 +39,7 @@
#include <grpc/support/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
#include "test/core/util/test_config.h"
/*
@@ -128,6 +129,7 @@ struct read_and_write_test_state {
static void read_and_write_test_read_handler(void *data, int success) {
struct read_and_write_test_state *state = data;
+loop:
state->bytes_read += count_slices(
state->incoming.slices, state->incoming.count, &state->current_read_data);
if (state->bytes_read == state->target_bytes || !success) {
@@ -140,11 +142,11 @@ static void read_and_write_test_read_handler(void *data, int success) {
switch (grpc_endpoint_read(state->read_ep, &state->incoming,
&state->done_read)) {
case GRPC_ENDPOINT_ERROR:
- read_and_write_test_read_handler(data, 0);
- break;
+ success = 0;
+ goto loop;
case GRPC_ENDPOINT_DONE:
- read_and_write_test_read_handler(data, 1);
- break;
+ success = 1;
+ goto loop;
case GRPC_ENDPOINT_PENDING:
break;
}
@@ -176,16 +178,17 @@ static void read_and_write_test_write_handler(void *data, int success) {
gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
&state->done_write);
- gpr_log(GPR_DEBUG, "write_status=%d", write_status);
- GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
free(slices);
if (write_status == GRPC_ENDPOINT_PENDING) {
return;
+ } else if (write_status == GRPC_ENDPOINT_ERROR) {
+ goto cleanup;
}
}
GPR_ASSERT(state->bytes_written == state->target_bytes);
}
+cleanup:
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1 + success;
@@ -204,6 +207,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_endpoint_test_fixture f =
begin_test(config, "read_and_write_test", slice_size);
+ gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d",
+ num_bytes, write_size, slice_size, shutdown);
if (shutdown) {
gpr_log(GPR_INFO, "Start read and write shutdown test");
@@ -264,11 +269,11 @@ static void read_and_write_test(grpc_endpoint_test_config config,
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- grpc_endpoint_destroy(state.read_ep);
- grpc_endpoint_destroy(state.write_ep);
+ end_test(config);
gpr_slice_buffer_destroy(&state.outgoing);
gpr_slice_buffer_destroy(&state.incoming);
- end_test(config);
+ grpc_endpoint_destroy(state.read_ep);
+ grpc_endpoint_destroy(state.write_ep);
}
struct timeout_test_state {
@@ -286,6 +291,7 @@ static void shutdown_during_write_test_read_handler(void *user_data,
int success) {
shutdown_during_write_test_state *st = user_data;
+loop:
if (!success) {
grpc_endpoint_destroy(st->ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@@ -297,11 +303,11 @@ static void shutdown_during_write_test_read_handler(void *user_data,
case GRPC_ENDPOINT_PENDING:
break;
case GRPC_ENDPOINT_ERROR:
- shutdown_during_write_test_read_handler(user_data, 0);
- break;
+ success = 0;
+ goto loop;
case GRPC_ENDPOINT_DONE:
- shutdown_during_write_test_read_handler(user_data, 1);
- break;
+ success = 1;
+ goto loop;
}
}
}
@@ -324,86 +330,15 @@ static void shutdown_during_write_test_write_handler(void *user_data,
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
-static void shutdown_during_write_test(grpc_endpoint_test_config config,
- size_t slice_size) {
- /* test that shutdown with a pending write creates no leaks */
- gpr_timespec deadline;
- size_t size;
- size_t nblocks;
- int current_data = 1;
- shutdown_during_write_test_state read_st;
- shutdown_during_write_test_state write_st;
- gpr_slice *slices;
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure done_write;
- grpc_endpoint_test_fixture f =
- begin_test(config, "shutdown_during_write_test", slice_size);
-
- gpr_log(GPR_INFO, "testing shutdown during a write");
-
- read_st.ep = f.client_ep;
- write_st.ep = f.server_ep;
- read_st.done = 0;
- write_st.done = 0;
-
- grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
- &write_st);
- grpc_iomgr_closure_init(&read_st.done_read,
- shutdown_during_write_test_read_handler, &read_st);
- gpr_slice_buffer_init(&read_st.incoming);
- gpr_slice_buffer_init(&outgoing);
-
- GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
- &read_st.done_read) == GRPC_ENDPOINT_PENDING);
- for (size = 1;; size *= 2) {
- slices = allocate_blocks(size, 1, &nblocks, &current_data);
- gpr_slice_buffer_reset_and_unref(&outgoing);
- gpr_slice_buffer_addn(&outgoing, slices, nblocks);
- switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
- case GRPC_ENDPOINT_DONE:
- break;
- case GRPC_ENDPOINT_ERROR:
- gpr_log(GPR_ERROR, "error writing");
- abort();
- case GRPC_ENDPOINT_PENDING:
- grpc_endpoint_shutdown(write_st.ep);
- deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- while (!write_st.done) {
- grpc_pollset_worker worker;
- GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
- grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- grpc_endpoint_destroy(write_st.ep);
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- while (!read_st.done) {
- grpc_pollset_worker worker;
- GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
- grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- gpr_free(slices);
- gpr_slice_buffer_destroy(&read_st.incoming);
- gpr_slice_buffer_destroy(&outgoing);
- end_test(config);
- return;
- }
- gpr_free(slices);
- }
-
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
-}
-
void grpc_endpoint_tests(grpc_endpoint_test_config config,
grpc_pollset *pollset) {
+ size_t i;
g_pollset = pollset;
read_and_write_test(config, 10000000, 100000, 8192, 0);
read_and_write_test(config, 1000000, 100000, 1, 0);
read_and_write_test(config, 100000000, 100000, 1, 1);
- shutdown_during_write_test(config, 1000);
+ for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
+ read_and_write_test(config, 40320, i, i, 0);
+ }
g_pollset = NULL;
}