aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-05-03 10:40:56 +0200
committerGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-05-04 08:00:45 +0200
commit0f981e9f1904cc169163836bb6f88bf588b5da8f (patch)
tree3fba812b11f5dc40c09868334e6f9361484714e8 /src/core
parent40b1e23b8c3998c6f64401c558872c5747c00f29 (diff)
Another round of win32 fixes and documentation.
-) Fixed a few more (much more rare) race conditions on shutdown. -) Fixed a degenerate case if we create a server but never start it.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/iocp_windows.c4
-rw-r--r--src/core/iomgr/iomgr_windows.c4
-rw-r--r--src/core/iomgr/pollset_kick_windows.h3
-rw-r--r--src/core/iomgr/pollset_windows.c5
-rw-r--r--src/core/iomgr/pollset_windows.h8
-rw-r--r--src/core/iomgr/socket_windows.c29
-rw-r--r--src/core/iomgr/socket_windows.h54
-rw-r--r--src/core/iomgr/tcp_server_windows.c28
-rw-r--r--src/core/iomgr/tcp_windows.c110
9 files changed, 203 insertions, 42 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index e7fc744ceb..1cdf3da0d6 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -177,6 +177,10 @@ void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
socket->orphan = 1;
}
+/* Calling notify_on_read or write means either of two things:
+ -) The IOCP already completed in the background, and we need to call
+ the callback now.
+ -) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque,
grpc_winsocket_callback_info *info) {
diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c
index f130ab9a07..74cd5a829b 100644
--- a/src/core/iomgr/iomgr_windows.c
+++ b/src/core/iomgr/iomgr_windows.c
@@ -43,6 +43,10 @@
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h"
+/* Windows' io manager is going to be fully designed using IO completion
+ ports. All of what we're doing here is basically make sure that
+ Windows sockets are initialized in and out. */
+
static void winsock_init(void) {
WSADATA wsaData;
int status = WSAStartup(MAKEWORD(2, 0), &wsaData);
diff --git a/src/core/iomgr/pollset_kick_windows.h b/src/core/iomgr/pollset_kick_windows.h
index 3836aa0082..c675c119ab 100644
--- a/src/core/iomgr/pollset_kick_windows.h
+++ b/src/core/iomgr/pollset_kick_windows.h
@@ -36,6 +36,9 @@
#include <grpc/support/sync.h>
+/* There isn't really any such thing as a pollset under Windows, due to the
+ nature of the IO completion ports. */
+
struct grpc_kick_fd_info;
typedef struct grpc_pollset_kick_state {
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index bea6711611..5af0685f9d 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -41,6 +41,11 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset_windows.h"
+/* There isn't really any such thing as a pollset under Windows, due to the
+ nature of the IO completion ports. We're still going to provide a minimal
+ set of features for the sake of the rest of grpc. But grpc_pollset_work
+ won't actually do any polling, and return as quickly as possible. */
+
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index 266175abfb..e1115bac4f 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -40,10 +40,10 @@
#include "src/core/iomgr/pollset_kick.h"
#include "src/core/iomgr/socket_windows.h"
-/* forward declare only in this file to avoid leaking impl details via
- pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not
- use the struct tag */
-struct grpc_fd;
+/* There isn't really any such thing as a pollset under Windows, due to the
+ nature of the IO completion ports. A Windows "pollset" is merely a mutex
+ and a condition variable, as this is the minimal set of features we need
+ implemented for the rest of grpc. But we won't use them directly. */
typedef struct grpc_pollset {
gpr_mu mu;
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index fe0196d99c..9306310d43 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -41,9 +41,9 @@
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/socket_windows.h"
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
+#include "src/core/iomgr/socket_windows.h"
grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket));
@@ -55,16 +55,29 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
return r;
}
-static void shutdown_op(grpc_winsocket_callback_info *info) {
- if (!info->cb) return;
- grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0);
-}
-
+/* Schedule a shutdown of the socket operations. Will call the pending
+ operations to abort them. We need to do that this way because of the
+ various callsites of that function, which happens to be in various
+ mutex hold states, and that'd be unsafe to call them directly. */
void grpc_winsocket_shutdown(grpc_winsocket *socket) {
- shutdown_op(&socket->read_info);
- shutdown_op(&socket->write_info);
+ gpr_mu_lock(&socket->state_mu);
+ if (socket->read_info.cb) {
+ grpc_iomgr_add_delayed_callback(socket->read_info.cb,
+ socket->read_info.opaque, 0);
+ }
+ if (socket->write_info.cb) {
+ grpc_iomgr_add_delayed_callback(socket->write_info.cb,
+ socket->write_info.opaque, 0);
+ }
+ gpr_mu_unlock(&socket->state_mu);
}
+/* Abandons a socket. Either we're going to queue it up for garbage collecting
+ from the IO Completion Port thread, or destroy it immediately. Note that this
+ mechanisms assumes that we're either always waiting for an operation, or we
+ explicitely know that we don't. If there is a future case where we can have
+ an "idle" socket which is neither trying to read or write, we'd start leaking
+ both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket;
if (!winsocket->closed_early) {
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index ee090668ea..6e778a776a 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -39,21 +39,43 @@
#include <grpc/support/sync.h>
#include <grpc/support/atm.h>
+/* This holds the data for an outstanding read or write on a socket.
+ The mutex to protect the concurrent access to that data is the one
+ inside the winsocket wrapper. */
typedef struct grpc_winsocket_callback_info {
/* This is supposed to be a WSAOVERLAPPED, but in order to get that
- * definition, we need to include ws2tcpip.h, which needs to be included
- * from the top, otherwise it'll clash with a previous inclusion of
- * windows.h that in turns includes winsock.h. If anyone knows a way
- * to do it properly, feel free to send a patch.
- */
+ definition, we need to include ws2tcpip.h, which needs to be included
+ from the top, otherwise it'll clash with a previous inclusion of
+ windows.h that in turns includes winsock.h. If anyone knows a way
+ to do it properly, feel free to send a patch. */
OVERLAPPED overlapped;
+ /* The callback information for the pending operation. May be empty if the
+ caller hasn't registered a callback yet. */
void(*cb)(void *opaque, int success);
void *opaque;
+ /* A boolean to describe if the IO Completion Port got a notification for
+ that operation. This will happen if the operation completed before the
+ called had time to register a callback. We could avoid that behavior
+ altogether by forcing the caller to always register its callback before
+ proceeding queue an operation, but it is frequent for an IO Completion
+ Port to trigger quickly. This way we avoid a context switch for calling
+ the callback. We also simplify the read / write operations to avoid having
+ to hold a mutex for a long amount of time. */
int has_pending_iocp;
+ /* The results of the overlapped operation. */
DWORD bytes_transfered;
int wsa_error;
} grpc_winsocket_callback_info;
+/* This is a wrapper to a Windows socket. A socket can have one outstanding
+ read, and one outstanding write. Doing an asynchronous accept means waiting
+ for a read operation. Doing an asynchronous connect means waiting for a
+ write operation. These are completely abitrary ties between the operation
+ and the kind of event, because we can have one overlapped per pending
+ operation, whichever its nature is. So we could have more dedicated pending
+ operation callbacks for connect and listen. But given the scope of listen
+ and accept, we don't need to go to that extent and waste memory. Also, this
+ is closer to what happens in posix world. */
typedef struct grpc_winsocket {
SOCKET socket;
@@ -62,17 +84,35 @@ typedef struct grpc_winsocket {
gpr_mu state_mu;
+ /* You can't add the same socket twice to the same IO Completion Port.
+ This prevents that. */
int added_to_iocp;
+ /* A boolean to indicate that the caller has abandonned that socket, but
+ there is a pending operation that the IO Completion Port will have to
+ wait for. The socket will be collected at that time. */
int orphan;
+ /* A boolean to indicate that the socket was already closed somehow, and
+ that no operation is going to be pending. Trying to abandon a socket in
+ that state won't result in an orphan, but will instead be destroyed
+ without further delay. We could avoid that boolean by adding one into
+ grpc_winsocket_callback_info describing that the operation is pending,
+ but that 1) waste memory more and 2) obfuscate the intent a bit more. */
int closed_early;
} grpc_winsocket;
-/* Create a wrapped windows handle.
-This takes ownership of closing it. */
+/* Create a wrapped windows handle. This takes ownership of it, meaning that
+ it will be responsible for closing it. */
grpc_winsocket *grpc_winsocket_create(SOCKET socket);
+/* Initiate an asynchronous shutdown of the socket. Will call off any pending
+ operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket *socket);
+
+/* Abandon a socket. */
void grpc_winsocket_orphan(grpc_winsocket *socket);
+
+/* Destroy a socket. Should only be called by the IO Completion Port thread,
+ or by grpc_winsocket_orphan if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index c4d3293e83..c6137e1e1d 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -109,7 +109,8 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts. */
for (i = 0; i < s->nports; i++) {
- grpc_winsocket_shutdown(s->ports[i].socket);
+ server_port *sp = &s->ports[i];
+ grpc_winsocket_shutdown(sp->socket);
}
/* This happens asynchronously. Wait while that happens. */
while (s->active_ports) {
@@ -275,22 +276,28 @@ static void on_accept(void *arg, int from_iocp) {
/* If we're not notified from the IOCP, it means we are asked to shutdown.
This will initiate that shutdown. Calling closesocket will trigger an
IOCP notification, that will call this function a second time, from
- the IOCP thread. */
+ the IOCP thread. Of course, this only works if the socket was, in fact,
+ listening. If that's not the case, we'd wait indefinitely. That's a bit
+ of a degenerate case, but it can happen if you create a server, but
+ don't start it. So let's support that by recursing once. */
sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET;
- closesocket(sock);
+ if (sock != INVALID_SOCKET) {
+ closesocket(sock);
+ } else {
+ on_accept(sp, 1);
+ }
+ return;
}
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep);
- if (from_iocp) {
- /* As we were notified from the IOCP of one and exactly one accept,
- the former socked we created has now either been destroy or assigned
- to the new connection. We need to create a new one for the next
- connection. */
- start_accept(sp);
- }
+ /* As we were notified from the IOCP of one and exactly one accept,
+ the former socked we created has now either been destroy or assigned
+ to the new connection. We need to create a new one for the next
+ connection. */
+ start_accept(sp);
}
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@@ -332,6 +339,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->socket = grpc_winsocket_create(sock);
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx;
+ sp->new_socket = INVALID_SOCKET;
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index d5b06e7b0b..c8483bd891 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -76,8 +76,11 @@ int grpc_tcp_prepare_socket(SOCKET sock) {
}
typedef struct grpc_tcp {
+ /* This is our C++ class derivation emulation. */
grpc_endpoint base;
+ /* The one socket this endpoint is using. */
grpc_winsocket *socket;
+ /* Refcounting how many operations are in progress. */
gpr_refcount refcount;
grpc_endpoint_read_cb read_cb;
@@ -90,6 +93,10 @@ typedef struct grpc_tcp {
gpr_slice_buffer write_slices;
int outstanding_write;
+ /* The IO Completion Port runs from another thread. We need some mechanism
+ to protect ourselves when requesting a shutdown. */
+ gpr_mu mu;
+ int shutting_down;
} grpc_tcp;
static void tcp_ref(grpc_tcp *tcp) {
@@ -100,11 +107,13 @@ static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
gpr_slice_buffer_destroy(&tcp->write_slices);
grpc_winsocket_orphan(tcp->socket);
+ gpr_mu_destroy(&tcp->mu);
gpr_free(tcp);
}
}
-static void on_read(void *tcpp, int success) {
+/* Asynchronous callback from the IOCP, or the background thread. */
+static void on_read(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *) tcpp;
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
@@ -114,16 +123,25 @@ static void on_read(void *tcpp, int success) {
grpc_endpoint_read_cb cb = tcp->read_cb;
grpc_winsocket_callback_info *info = &socket->read_info;
void *opaque = tcp->read_user_data;
+ int do_abort = 0;
+
+ gpr_mu_lock(&tcp->mu);
+ if (!from_iocp || tcp->shutting_down) {
+ /* If we are here with from_iocp set to true, it means we got raced to
+ shutting down the endpoint. No actual abort callback will happen
+ though, so we're going to do it from here. */
+ do_abort = 1;
+ }
+ gpr_mu_unlock(&tcp->mu);
- GPR_ASSERT(tcp->outstanding_read);
-
- if (!success) {
+ if (do_abort) {
+ if (from_iocp) gpr_slice_unref(tcp->read_slice);
tcp_unref(tcp);
cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
return;
}
- tcp->outstanding_read = 0;
+ GPR_ASSERT(tcp->outstanding_read);
if (socket->read_info.wsa_error != 0) {
char *utf8_message = gpr_format_message(info->wsa_error);
@@ -142,6 +160,9 @@ static void on_read(void *tcpp, int success) {
status = GRPC_ENDPOINT_CB_EOF;
}
}
+
+ tcp->outstanding_read = 0;
+
tcp_unref(tcp);
cb(opaque, slice, nslices, status);
}
@@ -158,6 +179,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
WSABUF buffer;
GPR_ASSERT(!tcp->outstanding_read);
+ GPR_ASSERT(!tcp->shutting_down);
tcp_ref(tcp);
tcp->outstanding_read = 1;
tcp->read_cb = cb;
@@ -168,10 +190,12 @@ static void win_notify_on_read(grpc_endpoint *ep,
buffer.len = GPR_SLICE_LENGTH(tcp->read_slice);
buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
+ /* First let's try a synchronous, non-blocking read. */
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError();
+ /* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
/* This might heavily recurse. */
@@ -179,6 +203,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
return;
}
+ /* Otherwise, let's retry, by queuing a read. */
memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
&info->overlapped, NULL);
@@ -192,30 +217,53 @@ static void win_notify_on_read(grpc_endpoint *ep,
if (error != WSA_IO_PENDING) {
char *utf8_message = gpr_format_message(WSAGetLastError());
- __debugbreak();
- gpr_log(GPR_ERROR, "WSARecv error: %s", utf8_message);
+ gpr_log(GPR_ERROR, "WSARecv error: %s - this means we're going to leak.",
+ utf8_message);
gpr_free(utf8_message);
- /* would the IO completion port be called anyway... ? Let's assume not. */
+ /* I'm pretty sure this is a very bad situation there. Hence the log.
+ What will happen now is that the socket will neither wait for read
+ or write, unless the caller retry, which is unlikely, but I am not
+ sure if that's guaranteed. And there might also be a write pending.
+ This means that the future orphanage of that socket will be in limbo,
+ and we're going to leak it. I have no idea what could cause this
+ specific case however, aside from a parameter error from our call.
+ Normal read errors would actually happen during the overlapped
+ operation, which is the supported way to go for that. */
tcp->outstanding_read = 0;
tcp_unref(tcp);
cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
+ /* Per the comment above, I'm going to treat that case as a hard failure
+ for now, and leave the option to catch that and debug. */
+ __debugbreak();
return;
}
grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
}
-static void on_write(void *tcpp, int success) {
+/* Asynchronous callback from the IOCP, or the background thread. */
+static void on_write(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *) tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
grpc_endpoint_write_cb cb = tcp->write_cb;
void *opaque = tcp->write_user_data;
+ int do_abort = 0;
+
+ gpr_mu_lock(&tcp->mu);
+ if (!from_iocp || tcp->shutting_down) {
+ /* If we are here with from_iocp set to true, it means we got raced to
+ shutting down the endpoint. No actual abort callback will happen
+ though, so we're going to do it from here. */
+ do_abort = 1;
+ }
+ gpr_mu_unlock(&tcp->mu);
GPR_ASSERT(tcp->outstanding_write);
- if (!success) {
+ if (do_abort) {
+ if (from_iocp) gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp_unref(tcp);
cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
return;
@@ -238,6 +286,7 @@ static void on_write(void *tcpp, int success) {
cb(opaque, status);
}
+/* Initiates a write. */
static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
gpr_slice *slices, size_t nslices,
grpc_endpoint_write_cb cb,
@@ -253,11 +302,13 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
WSABUF *buffers = local_buffers;
GPR_ASSERT(!tcp->outstanding_write);
+ GPR_ASSERT(!tcp->shutting_down);
tcp_ref(tcp);
tcp->outstanding_write = 1;
tcp->write_cb = cb;
tcp->write_user_data = arg;
+
gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
@@ -270,10 +321,14 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
}
+ /* First, let's try a synchronous, non-blocking write. */
status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError();
+ /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
+ connection that has its send queue filled up. But if we don't, then we can
+ avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) {
grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR;
if (status == 0) {
@@ -291,25 +346,42 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
return ret;
}
+ /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
+ operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated);
+ /* It is possible the operation completed then. But we'd still get an IOCP
+ notification. So let's ignore it and wait for the IOCP. */
if (status != 0) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
char *utf8_message = gpr_format_message(WSAGetLastError());
- __debugbreak();
- gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message);
+ gpr_log(GPR_ERROR, "WSASend error: %s - this means we're going to leak.",
+ utf8_message);
gpr_free(utf8_message);
- /* would the IO completion port be called anyway ? Let's assume not. */
+ /* I'm pretty sure this is a very bad situation there. Hence the log.
+ What will happen now is that the socket will neither wait for read
+ or write, unless the caller retry, which is unlikely, but I am not
+ sure if that's guaranteed. And there might also be a read pending.
+ This means that the future orphanage of that socket will be in limbo,
+ and we're going to leak it. I have no idea what could cause this
+ specific case however, aside from a parameter error from our call.
+ Normal read errors would actually happen during the overlapped
+ operation, which is the supported way to go for that. */
tcp->outstanding_write = 0;
tcp_unref(tcp);
+ /* Per the comment above, I'm going to treat that case as a hard failure
+ for now, and leave the option to catch that and debug. */
+ __debugbreak();
return GRPC_ENDPOINT_WRITE_ERROR;
}
}
+ /* As all is now setup, we can now ask for the IOCP notification. It may
+ trigger the callback immediately however, but no matter. */
grpc_socket_notify_on_write(socket, on_write, tcp);
return GRPC_ENDPOINT_WRITE_PENDING;
}
@@ -319,9 +391,20 @@ static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_iocp_add_socket(tcp->socket);
}
+/* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
+ for the potential read and write operations. It is up to the caller to
+ guarantee this isn't called in parallel to a read or write request, so
+ we're not going to protect against these. However the IO Completion Port
+ callback will happen from another thread, so we need to protect against
+ concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *) ep;
+ gpr_mu_lock(&tcp->mu);
+ /* At that point, what may happen is that we're already inside the IOCP
+ callback. See the comments in on_read and on_write. */
+ tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket);
+ gpr_mu_unlock(&tcp->mu);
}
static void win_destroy(grpc_endpoint *ep) {
@@ -338,6 +421,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->socket = socket;
+ gpr_mu_init(&tcp->mu);
gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1);
return &tcp->base;