diff options
Diffstat (limited to 'src/core/iomgr/tcp_windows.c')
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 80 |
1 files changed, 41 insertions, 39 deletions
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index ab309180eb..4a531b8546 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -82,6 +82,9 @@ typedef struct grpc_tcp { /* Refcounting how many operations are in progress. */ gpr_refcount refcount; + grpc_closure on_read; + grpc_closure on_write; + grpc_closure *read_cb; grpc_closure *write_cb; gpr_slice read_slice; @@ -135,7 +138,9 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif /* Asynchronous callback from the IOCP, or the background thread. */ -static int on_read(grpc_tcp *tcp, int success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, int success) { + grpc_tcp *tcp = tcpp; + grpc_closure *cb = tcp->read_cb; grpc_winsocket *socket = tcp->socket; gpr_slice sub; gpr_slice *slice = NULL; @@ -164,23 +169,17 @@ static int on_read(grpc_tcp *tcp, int success) { } } - return success; -} - -static void on_read_cb(void *tcpp, int from_iocp) { - grpc_tcp *tcp = tcpp; - grpc_closure *cb = tcp->read_cb; - int success = on_read(tcp, from_iocp); tcp->read_cb = NULL; TCP_UNREF(tcp, "read"); if (cb) { - cb->cb(cb->cb_arg, success); + cb->cb(exec_ctx, cb->cb_arg, success); } } -static grpc_endpoint_op_status win_read(grpc_endpoint *ep, - gpr_slice_buffer *read_slices, - grpc_closure *cb) { +static void win_read(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + gpr_slice_buffer *read_slices, + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->read_info; @@ -190,7 +189,8 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - return GRPC_ENDPOINT_ERROR; + grpc_exec_ctx_enqueue(exec_ctx, cb, 0); + return; } tcp->read_cb = cb; @@ -202,6 +202,8 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep, buffer.len = GPR_SLICE_LENGTH(tcp->read_slice); buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice); + TCP_REF(tcp, "read"); + /* First let's try a synchronous, non-blocking read. */ status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL); @@ -209,14 +211,11 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { - int ok; info->bytes_transfered = bytes_read; - ok = on_read(tcp, 1); - return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; + grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 1); + return; } - TCP_REF(tcp, "read"); - /* Otherwise, let's retry, by queuing a read. */ memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, @@ -225,19 +224,17 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep, if (status != 0) { int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { - int ok; info->wsa_error = wsa_error; - ok = on_read(tcp, 1); - return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; + grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 0); + return; } } - grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp); - return GRPC_ENDPOINT_PENDING; + grpc_socket_notify_on_read(exec_ctx, tcp->socket, &tcp->on_read); } /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_write(void *tcpp, int success) { +static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, int success) { grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; @@ -263,13 +260,14 @@ static void on_write(void *tcpp, int success) { } TCP_UNREF(tcp, "write"); - cb->cb(cb->cb_arg, success); + cb->cb(exec_ctx, cb->cb_arg, success); } /* Initiates a write. */ -static grpc_endpoint_op_status win_write(grpc_endpoint *ep, - gpr_slice_buffer *slices, - grpc_closure *cb) { +static void win_write(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + gpr_slice_buffer *slices, + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *socket = tcp->socket; grpc_winsocket_callback_info *info = &socket->write_info; @@ -281,7 +279,8 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, WSABUF *buffers = local_buffers; if (tcp->shutting_down) { - return GRPC_ENDPOINT_ERROR; + grpc_exec_ctx_enqueue(exec_ctx, cb, 0); + return; } tcp->write_cb = cb; @@ -306,9 +305,9 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, connection that has its send queue filled up. But if we don't, then we can avoid doing an async write operation at all. */ if (info->wsa_error != WSAEWOULDBLOCK) { - grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR; + int ok = 0; if (status == 0) { - ret = GRPC_ENDPOINT_DONE; + ok = 1; GPR_ASSERT(bytes_sent == tcp->write_slices->length); } else { if (socket->read_info.wsa_error != WSAECONNRESET) { @@ -318,7 +317,8 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, } } if (allocated) gpr_free(allocated); - return ret; + grpc_exec_ctx_enqueue(exec_ctx, cb, ok); + return; } TCP_REF(tcp, "write"); @@ -334,24 +334,24 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); - return GRPC_ENDPOINT_ERROR; + grpc_exec_ctx_enqueue(exec_ctx, cb, 0); + return; } } /* As all is now setup, we can now ask for the IOCP notification. It may trigger the callback immediately however, but no matter. */ - grpc_socket_notify_on_write(socket, on_write, tcp); - return GRPC_ENDPOINT_PENDING; + grpc_socket_notify_on_write(exec_ctx, socket, &tcp->on_write); } -static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) { +static void win_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *ps) { grpc_tcp *tcp; (void)ps; tcp = (grpc_tcp *)ep; grpc_iocp_add_socket(tcp->socket); } -static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) { +static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pss) { grpc_tcp *tcp; (void)pss; tcp = (grpc_tcp *)ep; @@ -364,7 +364,7 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) { we're not going to protect against these. However the IO Completion Port callback will happen from another thread, so we need to protect against concurrent access of the data structure in that regard. */ -static void win_shutdown(grpc_endpoint *ep) { +static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; gpr_mu_lock(&tcp->mu); /* At that point, what may happen is that we're already inside the IOCP @@ -374,7 +374,7 @@ static void win_shutdown(grpc_endpoint *ep) { gpr_mu_unlock(&tcp->mu); } -static void win_destroy(grpc_endpoint *ep) { +static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; TCP_UNREF(tcp, "destroy"); } @@ -395,6 +395,8 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { tcp->socket = socket; gpr_mu_init(&tcp->mu); gpr_ref_init(&tcp->refcount, 1); + grpc_closure_init(&tcp->on_read, on_read, tcp); + grpc_closure_init(&tcp->on_read, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); return &tcp->base; } |