aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nnoble@google.com>2015-02-09 16:20:49 -0800
committerGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-02-10 01:25:42 +0100
commit45e67a37ae63b14d22eab7ee4bea9b912baa7010 (patch)
treee35572753aa8b882176d49ef8ce98dbecd9745b9 /src/core/iomgr
parent0f3ec822380081670edd0e9e1d7e40c1122bda21 (diff)
Addressing comments.
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/iocp_windows.c200
-rw-r--r--src/core/iomgr/iocp_windows.h (renamed from src/core/iomgr/iomgr_windows.h)20
-rw-r--r--src/core/iomgr/iomgr_windows.c6
-rw-r--r--src/core/iomgr/pollset_windows.c174
-rw-r--r--src/core/iomgr/pollset_windows.h11
-rw-r--r--src/core/iomgr/socket_windows.c3
-rw-r--r--src/core/iomgr/tcp_client_windows.c2
-rw-r--r--src/core/iomgr/tcp_server_windows.c2
-rw-r--r--src/core/iomgr/tcp_windows.c15
9 files changed, 232 insertions, 201 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
new file mode 100644
index 0000000000..729b11b78d
--- /dev/null
+++ b/src/core/iomgr/iocp_windows.c
@@ -0,0 +1,200 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WINSOCK_SOCKET
+
+#include <winsock2.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/log_win32.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/thd.h>
+
+#include "src/core/iomgr/alarm_internal.h"
+#include "src/core/iomgr/iocp_windows.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/socket_windows.h"
+
+static ULONG g_iocp_kick_token;
+static OVERLAPPED g_iocp_custom_overlap;
+
+static gpr_event g_shutdown_iocp;
+static gpr_event g_iocp_done;
+
+static HANDLE g_iocp;
+
+static int do_iocp_work() {
+ BOOL success;
+ DWORD bytes = 0;
+ DWORD flags = 0;
+ ULONG_PTR completion_key;
+ LPOVERLAPPED overlapped;
+ gpr_timespec wait_time = gpr_inf_future;
+ grpc_winsocket *socket;
+ grpc_winsocket_callback_info *info;
+ void(*f)(void *, int) = NULL;
+ void *opaque = NULL;
+ success = GetQueuedCompletionStatus(g_iocp, &bytes,
+ &completion_key, &overlapped,
+ gpr_time_to_millis(wait_time));
+ if (!success && !overlapped) {
+ /* The deadline got attained. */
+ return 0;
+ }
+ GPR_ASSERT(completion_key && overlapped);
+ if (overlapped == &g_iocp_custom_overlap) {
+ if (completion_key == (ULONG_PTR) &g_iocp_kick_token) {
+ /* We were awoken from a kick. */
+ gpr_log(GPR_DEBUG, "do_iocp_work - got a kick");
+ return 1;
+ }
+ gpr_log(GPR_ERROR, "Unknown custom completion key.");
+ abort();
+ }
+
+ socket = (grpc_winsocket*) completion_key;
+ if (overlapped == &socket->write_info.overlapped) {
+ gpr_log(GPR_DEBUG, "do_iocp_work - got write packet");
+ info = &socket->write_info;
+ } else if (overlapped == &socket->read_info.overlapped) {
+ gpr_log(GPR_DEBUG, "do_iocp_work - got read packet");
+ info = &socket->read_info;
+ } else {
+ gpr_log(GPR_ERROR, "Unknown IOCP operation");
+ abort();
+ }
+ success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
+ FALSE, &flags);
+ gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s", bytes, flags,
+ success ? "succeeded" : "failed");
+ info->bytes_transfered = bytes;
+ info->wsa_error = success ? 0 : WSAGetLastError();
+ GPR_ASSERT(overlapped == &info->overlapped);
+ gpr_mu_lock(&socket->state_mu);
+ GPR_ASSERT(!info->has_pending_iocp);
+ if (info->cb) {
+ f = info->cb;
+ opaque = info->opaque;
+ info->cb = NULL;
+ } else {
+ info->has_pending_iocp = 1;
+ }
+ gpr_mu_unlock(&socket->state_mu);
+ if (f) f(opaque, 1);
+
+ return 1;
+}
+
+static void iocp_loop(void *p) {
+ while (!gpr_event_get(&g_shutdown_iocp)) {
+ grpc_maybe_call_delayed_callbacks(NULL, 1);
+ do_iocp_work();
+ }
+
+ gpr_event_set(&g_iocp_done, (void *)1);
+}
+
+void grpc_iocp_init(void) {
+ gpr_thd_id id;
+
+ g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
+ (ULONG_PTR)NULL, 0);
+ GPR_ASSERT(g_iocp);
+
+ gpr_event_init(&g_iocp_done);
+ gpr_event_init(&g_shutdown_iocp);
+ gpr_thd_new(&id, iocp_loop, NULL, NULL);
+}
+
+void grpc_iocp_shutdown(void) {
+ BOOL success;
+ gpr_event_set(&g_shutdown_iocp, (void *)1);
+ success = PostQueuedCompletionStatus(g_iocp, 0,
+ (ULONG_PTR) &g_iocp_kick_token,
+ &g_iocp_custom_overlap);
+ GPR_ASSERT(success);
+ gpr_event_wait(&g_iocp_done, gpr_inf_future);
+ success = CloseHandle(g_iocp);
+ GPR_ASSERT(success);
+}
+
+void grpc_iocp_add_socket(grpc_winsocket *socket) {
+ HANDLE ret;
+ if (socket->added_to_iocp) return;
+ ret = CreateIoCompletionPort((HANDLE)socket->socket,
+ g_iocp, (gpr_uintptr) socket, 0);
+ if (!ret) {
+ char *utf8_message = gpr_format_message(WSAGetLastError());
+ gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
+ gpr_free(utf8_message);
+ __debugbreak();
+ abort();
+ }
+ socket->added_to_iocp = 1;
+ GPR_ASSERT(ret == g_iocp);
+}
+
+static void socket_notify_on_iocp(grpc_winsocket *socket,
+ void(*cb)(void *, int), void *opaque,
+ grpc_winsocket_callback_info *info) {
+ int run_now = 0;
+ GPR_ASSERT(!info->cb);
+ gpr_mu_lock(&socket->state_mu);
+ if (info->has_pending_iocp) {
+ run_now = 1;
+ info->has_pending_iocp = 0;
+ gpr_log(GPR_DEBUG, "socket_notify_on_iocp - runs now");
+ } else {
+ info->cb = cb;
+ info->opaque = opaque;
+ gpr_log(GPR_DEBUG, "socket_notify_on_iocp - queued");
+ }
+ gpr_mu_unlock(&socket->state_mu);
+ if (run_now) cb(opaque, 1);
+}
+
+void grpc_socket_notify_on_write(grpc_winsocket *socket,
+ void(*cb)(void *, int), void *opaque) {
+ gpr_log(GPR_DEBUG, "grpc_socket_notify_on_write");
+ socket_notify_on_iocp(socket, cb, opaque, &socket->write_info);
+}
+
+void grpc_socket_notify_on_read(grpc_winsocket *socket,
+ void(*cb)(void *, int), void *opaque) {
+ gpr_log(GPR_DEBUG, "grpc_socket_notify_on_read");
+ socket_notify_on_iocp(socket, cb, opaque, &socket->read_info);
+}
+
+#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/iomgr_windows.h b/src/core/iomgr/iocp_windows.h
index 2d9449c1f6..bf5b90978e 100644
--- a/src/core/iomgr/iomgr_windows.h
+++ b/src/core/iomgr/iocp_windows.h
@@ -31,12 +31,22 @@
*
*/
-#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_WINDOWS_H_
-#define __GRPC_INTERNAL_IOMGR_IOMGR_WINDOWS_H_
+#ifndef __GRPC_INTERNAL_IOMGR_IOCP_WINDOWS_H_
+#define __GRPC_INTERNAL_IOMGR_IOCP_WINDOWS_H_
+
+#include <windows.h>
+#include <grpc/support/sync.h>
#include "src/core/iomgr/socket_windows.h"
-void grpc_pollset_global_init(void);
-void grpc_pollset_global_shutdown(void);
+void grpc_iocp_init(void);
+void grpc_iocp_shutdown(void);
+void grpc_iocp_add_socket(grpc_winsocket *);
+
+void grpc_socket_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success),
+ void *opaque);
+
+void grpc_socket_notify_on_read(grpc_winsocket *, void(*cb)(void *, int success),
+ void *opaque);
-#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_WINDOWS_H_ */
+#endif /* __GRPC_INTERNAL_IOMGR_IOCP_WINDOWS_H_ */
diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c
index 5c8382e1c0..a3a255eaed 100644
--- a/src/core/iomgr/iomgr_windows.c
+++ b/src/core/iomgr/iomgr_windows.c
@@ -40,8 +40,8 @@
#include <grpc/support/log.h>
#include "src/core/iomgr/socket_windows.h"
+#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/iomgr_windows.h"
static void winsock_init(void) {
WSADATA wsaData;
@@ -56,11 +56,11 @@ static void winsock_shutdown(void) {
void grpc_iomgr_platform_init(void) {
winsock_init();
- grpc_pollset_global_init();
+ grpc_iocp_init();
}
void grpc_iomgr_platform_shutdown(void) {
- grpc_pollset_global_shutdown();
+ grpc_iocp_shutdown();
winsock_shutdown();
}
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 134e6f45e2..b81d23e57c 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -35,106 +35,20 @@
#ifdef GPR_WINSOCK_SOCKET
-#include <winsock2.h>
-
-#include <grpc/support/log.h>
-#include <grpc/support/log_win32.h>
-#include <grpc/support/alloc.h>
#include <grpc/support/thd.h>
#include "src/core/iomgr/alarm_internal.h"
-#include "src/core/iomgr/socket_windows.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset_windows.h"
-static grpc_pollset g_global_pollset;
-static ULONG g_pollset_kick_token;
-static OVERLAPPED g_pollset_custom_overlap;
-
-static gpr_event g_shutdown_global_poller;
-static gpr_event g_global_poller_done;
-
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
- pollset->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
- (ULONG_PTR)NULL, 0);
- GPR_ASSERT(pollset->iocp);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
- BOOL status;
gpr_mu_destroy(&pollset->mu);
gpr_cv_destroy(&pollset->cv);
- status = CloseHandle(pollset->iocp);
- GPR_ASSERT(status);
-}
-
-static int pollset_poll(grpc_pollset *pollset,
- gpr_timespec deadline, gpr_timespec now) {
- BOOL success;
- DWORD bytes = 0;
- DWORD flags = 0;
- ULONG_PTR completion_key;
- LPOVERLAPPED overlapped;
- gpr_timespec wait_time = gpr_time_sub(deadline, now);
- grpc_winsocket *socket;
- grpc_winsocket_callback_info *info;
- void(*f)(void *, int) = NULL;
- void *opaque = NULL;
- gpr_mu_unlock(&pollset->mu);
- success = GetQueuedCompletionStatus(pollset->iocp, &bytes,
- &completion_key, &overlapped,
- gpr_time_to_millis(wait_time));
- gpr_mu_lock(&pollset->mu);
- if (!success && !overlapped) {
- /* The deadline got attained. */
- return 0;
- }
- GPR_ASSERT(completion_key && overlapped);
- if (overlapped == &g_pollset_custom_overlap) {
- if (completion_key == (ULONG_PTR) &g_pollset_kick_token) {
- /* We were awoken from a kick. */
- gpr_log(GPR_DEBUG, "pollset_poll - got a kick");
- return 1;
- }
- gpr_log(GPR_ERROR, "Unknown custom completion key.");
- abort();
- }
-
- GPR_ASSERT(pollset == &g_global_pollset);
-
- socket = (grpc_winsocket*) completion_key;
- if (overlapped == &socket->write_info.overlapped) {
- gpr_log(GPR_DEBUG, "pollset_poll - got write packet");
- info = &socket->write_info;
- } else if (overlapped == &socket->read_info.overlapped) {
- gpr_log(GPR_DEBUG, "pollset_poll - got read packet");
- info = &socket->read_info;
- } else {
- gpr_log(GPR_ERROR, "Unknown IOCP operation");
- abort();
- }
- success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
- FALSE, &flags);
- gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s", bytes, flags,
- success ? "succeeded" : "failed");
- info->bytes_transfered = bytes;
- info->wsa_error = success ? 0 : WSAGetLastError();
- GPR_ASSERT(overlapped == &info->overlapped);
- gpr_mu_lock(&socket->state_mu);
- GPR_ASSERT(!info->has_pending_iocp);
- if (info->cb) {
- f = info->cb;
- opaque = info->opaque;
- info->cb = NULL;
- } else {
- info->has_pending_iocp = 1;
- }
- gpr_mu_unlock(&socket->state_mu);
- if (f) f(opaque, 1);
-
- return 1;
}
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
@@ -149,93 +63,9 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
if (grpc_alarm_check(NULL, now, &deadline)) {
return 1;
}
- return pollset_poll(pollset, deadline, now);
+ return 0;
}
-void grpc_pollset_kick(grpc_pollset *pollset) {
- BOOL status;
- status = PostQueuedCompletionStatus(pollset->iocp, 0,
- (ULONG_PTR) &g_pollset_kick_token,
- &g_pollset_custom_overlap);
- GPR_ASSERT(status);
-}
-
-static void global_poller(void *p) {
- while (!gpr_event_get(&g_shutdown_global_poller)) {
- gpr_mu_lock(&g_global_pollset.mu);
- grpc_pollset_work(&g_global_pollset, gpr_inf_future);
- gpr_mu_unlock(&g_global_pollset.mu);
- }
-
- gpr_event_set(&g_global_poller_done, (void *) 1);
-}
-
-void grpc_pollset_global_init(void) {
- gpr_thd_id id;
-
- grpc_pollset_init(&g_global_pollset);
- gpr_event_init(&g_global_poller_done);
- gpr_event_init(&g_shutdown_global_poller);
- gpr_thd_new(&id, global_poller, NULL, NULL);
-}
-
-void grpc_pollset_global_shutdown(void) {
- gpr_event_set(&g_shutdown_global_poller, (void *) 1);
- grpc_pollset_kick(&g_global_pollset);
- gpr_event_wait(&g_global_poller_done, gpr_inf_future);
- grpc_pollset_destroy(&g_global_pollset);
-}
-
-void grpc_pollset_add_handle(grpc_pollset *pollset, grpc_winsocket *socket) {
- HANDLE ret;
- if (socket->added_to_iocp) return;
- ret = CreateIoCompletionPort((HANDLE)socket->socket,
- g_global_pollset.iocp,
- (gpr_uintptr) socket, 0);
- if (!ret) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
- gpr_free(utf8_message);
- __debugbreak();
- abort();
- }
- socket->added_to_iocp = 1;
- GPR_ASSERT(ret == g_global_pollset.iocp);
-}
-
-static void handle_notify_on_iocp(grpc_winsocket *socket,
- void(*cb)(void *, int), void *opaque,
- grpc_winsocket_callback_info *info) {
- int run_now = 0;
- GPR_ASSERT(!info->cb);
- gpr_mu_lock(&socket->state_mu);
- if (info->has_pending_iocp) {
- run_now = 1;
- info->has_pending_iocp = 0;
- gpr_log(GPR_DEBUG, "handle_notify_on_iocp - runs now");
- } else {
- info->cb = cb;
- info->opaque = opaque;
- gpr_log(GPR_DEBUG, "handle_notify_on_iocp - queued");
- }
- gpr_mu_unlock(&socket->state_mu);
- if (run_now) cb(opaque, 1);
-}
-
-void grpc_handle_notify_on_write(grpc_winsocket *socket,
- void(*cb)(void *, int), void *opaque) {
- gpr_log(GPR_DEBUG, "grpc_handle_notify_on_write");
- handle_notify_on_iocp(socket, cb, opaque, &socket->write_info);
-}
-
-void grpc_handle_notify_on_read(grpc_winsocket *socket,
- void(*cb)(void *, int), void *opaque) {
- gpr_log(GPR_DEBUG, "grpc_handle_notify_on_read");
- handle_notify_on_iocp(socket, cb, opaque, &socket->read_info);
-}
-
-grpc_pollset *grpc_global_pollset(void) {
- return &g_global_pollset;
-}
+void grpc_pollset_kick(grpc_pollset *p) { }
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index 919af5d7b7..1a5e31f627 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -48,20 +48,9 @@ struct grpc_fd;
typedef struct grpc_pollset {
gpr_mu mu;
gpr_cv cv;
- HANDLE iocp;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv)
-void grpc_pollset_add_handle(grpc_pollset *, grpc_winsocket *);
-
-grpc_pollset *grpc_global_pollset(void);
-
-void grpc_handle_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success),
- void *opaque);
-
-void grpc_handle_notify_on_read(grpc_winsocket *, void(*cb)(void *, int success),
- void *opaque);
-
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_WINDOWS_H_ */
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 805e96a0d1..3639798dbc 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -37,6 +37,7 @@
#ifdef GPR_WINSOCK_SOCKET
+#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_windows.h"
@@ -50,7 +51,7 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
r->socket = socket;
gpr_mu_init(&r->state_mu);
grpc_iomgr_ref();
- grpc_pollset_add_handle(grpc_global_pollset(), r);
+ grpc_iocp_add_socket(r);
return r;
}
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 37e6b12552..2ed5f39b39 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -197,7 +197,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
ac->refs = 2;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
- grpc_handle_notify_on_write(socket, on_connect, ac);
+ grpc_socket_notify_on_write(socket, on_connect, ac);
return;
failure:
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 21901958d1..97f8fe41ea 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -201,7 +201,7 @@ static void start_accept(server_port *port) {
}
port->new_socket = sock;
- grpc_handle_notify_on_read(port->socket, on_accept, port);
+ grpc_socket_notify_on_read(port->socket, on_accept, port);
return;
failure:
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index bd0b2dd869..94d84f92b5 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -43,11 +43,12 @@
#include <grpc/support/slice_buffer.h>
#include <grpc/support/useful.h>
-#include "src/core/iomgr/tcp_client.h"
-#include "src/core/iomgr/socket_windows.h"
#include "src/core/iomgr/alarm.h"
+#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/sockaddr.h"
#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/iomgr/socket_windows.h"
+#include "src/core/iomgr/tcp_client.h"
static int set_non_block(SOCKET sock) {
int status;
@@ -121,7 +122,7 @@ static void on_read(void *tcpp, int success) {
if (!success) {
tcp_unref(tcp);
- cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
+ cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
return;
}
@@ -194,7 +195,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
if (status == 0) {
gpr_log(GPR_DEBUG, "got response immediately, but we're going to sleep");
- grpc_handle_notify_on_read(tcp->socket, on_read, tcp);
+ grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
return;
}
@@ -213,7 +214,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
}
gpr_log(GPR_DEBUG, "waiting on the IO completion port now");
- grpc_handle_notify_on_read(tcp->socket, on_read, tcp);
+ grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
}
static void on_write(void *tcpp, int success) {
@@ -333,14 +334,14 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
gpr_log(GPR_DEBUG, "wrote data immediately - but we're going to sleep");
}
- grpc_handle_notify_on_write(socket, on_write, tcp);
+ grpc_socket_notify_on_write(socket, on_write, tcp);
return GRPC_ENDPOINT_WRITE_PENDING;
}
static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_tcp *tcp = (grpc_tcp *) ep;
gpr_log(GPR_DEBUG, "win_add_to_pollset");
- grpc_pollset_add_handle(pollset, tcp->socket);
+ grpc_iocp_add_socket(tcp->socket);
}
static void win_shutdown(grpc_endpoint *ep) {