aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/pollset_windows.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/pollset_windows.c')
-rw-r--r--src/core/iomgr/pollset_windows.c30
1 files changed, 26 insertions, 4 deletions
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index fdebede482..134e6f45e2 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -38,6 +38,8 @@
#include <winsock2.h>
#include <grpc/support/log.h>
+#include <grpc/support/log_win32.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/thd.h>
#include "src/core/iomgr/alarm_internal.h"
@@ -53,6 +55,8 @@ static gpr_event g_shutdown_global_poller;
static gpr_event g_global_poller_done;
void grpc_pollset_init(grpc_pollset *pollset) {
+ gpr_mu_init(&pollset->mu);
+ gpr_cv_init(&pollset->cv);
pollset->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
(ULONG_PTR)NULL, 0);
GPR_ASSERT(pollset->iocp);
@@ -60,6 +64,8 @@ void grpc_pollset_init(grpc_pollset *pollset) {
void grpc_pollset_destroy(grpc_pollset *pollset) {
BOOL status;
+ gpr_mu_destroy(&pollset->mu);
+ gpr_cv_destroy(&pollset->cv);
status = CloseHandle(pollset->iocp);
GPR_ASSERT(status);
}
@@ -76,10 +82,11 @@ static int pollset_poll(grpc_pollset *pollset,
grpc_winsocket_callback_info *info;
void(*f)(void *, int) = NULL;
void *opaque = NULL;
+ gpr_mu_unlock(&pollset->mu);
success = GetQueuedCompletionStatus(pollset->iocp, &bytes,
&completion_key, &overlapped,
gpr_time_to_millis(wait_time));
-
+ gpr_mu_lock(&pollset->mu);
if (!success && !overlapped) {
/* The deadline got attained. */
return 0;
@@ -95,6 +102,8 @@ static int pollset_poll(grpc_pollset *pollset,
abort();
}
+ GPR_ASSERT(pollset == &g_global_pollset);
+
socket = (grpc_winsocket*) completion_key;
if (overlapped == &socket->write_info.overlapped) {
gpr_log(GPR_DEBUG, "pollset_poll - got write packet");
@@ -153,7 +162,9 @@ void grpc_pollset_kick(grpc_pollset *pollset) {
static void global_poller(void *p) {
while (!gpr_event_get(&g_shutdown_global_poller)) {
+ gpr_mu_lock(&g_global_pollset.mu);
grpc_pollset_work(&g_global_pollset, gpr_inf_future);
+ gpr_mu_unlock(&g_global_pollset.mu);
}
gpr_event_set(&g_global_poller_done, (void *) 1);
@@ -176,9 +187,20 @@ void grpc_pollset_global_shutdown(void) {
}
void grpc_pollset_add_handle(grpc_pollset *pollset, grpc_winsocket *socket) {
- HANDLE ret = CreateIoCompletionPort((HANDLE) socket->socket, pollset->iocp,
- (gpr_uintptr) socket, 0);
- GPR_ASSERT(ret == pollset->iocp);
+ HANDLE ret;
+ if (socket->added_to_iocp) return;
+ ret = CreateIoCompletionPort((HANDLE)socket->socket,
+ g_global_pollset.iocp,
+ (gpr_uintptr) socket, 0);
+ if (!ret) {
+ char *utf8_message = gpr_format_message(WSAGetLastError());
+ gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
+ gpr_free(utf8_message);
+ __debugbreak();
+ abort();
+ }
+ socket->added_to_iocp = 1;
+ GPR_ASSERT(ret == g_global_pollset.iocp);
}
static void handle_notify_on_iocp(grpc_winsocket *socket,