aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_windows.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-10-13 16:07:13 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2017-10-18 17:12:19 -0700
commit0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch)
treee43d5de442fdcc3d39cd5af687f319fa39612d3f /src/core/lib/iomgr/tcp_windows.cc
parent6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff)
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx also keeps track of the previous exec_ctx so that nesting of exec_ctx is allowed. This means that there is only one exec_ctx being used at any time. Also, grpc_exec_ctx_finish is called in the destructor of the object, and the previous exec_ctx is restored to avoid breaking current functionality. The code still explicitly calls grpc_exec_ctx_finish because removing all such instances causes the code to break.
Diffstat (limited to 'src/core/lib/iomgr/tcp_windows.cc')
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc88
1 files changed, 41 insertions, 47 deletions
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index dc84e564a9..baa0a1895e 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -109,21 +109,20 @@ typedef struct grpc_tcp {
char *peer_string;
} grpc_tcp;
-static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+static void tcp_free(grpc_tcp *tcp) {
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
- grpc_resource_user_unref(exec_ctx, tcp->resource_user);
+ grpc_resource_user_unref(tcp->resource_user);
if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
gpr_free(tcp);
}
#ifndef NDEBUG
-#define TCP_UNREF(exec_ctx, tcp, reason) \
- tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
- const char *reason, const char *file, int line) {
+static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -131,7 +130,7 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
val - 1);
}
if (gpr_unref(&tcp->refcount)) {
- tcp_free(exec_ctx, tcp);
+ tcp_free(tcp);
}
}
@@ -146,11 +145,11 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
gpr_ref(&tcp->refcount);
}
#else
-#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp))
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
- tcp_free(exec_ctx, tcp);
+ tcp_free(tcp);
}
}
@@ -158,7 +157,7 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
+static void on_read(void *tcpp, grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_closure *cb = tcp->read_cb;
grpc_winsocket *socket = tcp->socket;
@@ -172,13 +171,13 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
char *utf8_message = gpr_format_message(info->wsa_error);
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
gpr_free(utf8_message);
- grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
+ grpc_slice_unref_internal(tcp->read_slice);
} else {
if (info->bytes_transfered != 0 && !tcp->shutting_down) {
sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
grpc_slice_buffer_add(tcp->read_slices, sub);
} else {
- grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
+ grpc_slice_unref_internal(tcp->read_slice);
error = tcp->shutting_down
? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP stream shutting down", &tcp->shutdown_error, 1)
@@ -188,12 +187,12 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
tcp->read_cb = NULL;
- TCP_UNREF(exec_ctx, tcp, "read");
- GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
+ TCP_UNREF(tcp, "read");
+ GRPC_CLOSURE_SCHED(cb, error);
}
-static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- grpc_slice_buffer *read_slices, grpc_closure *cb) {
+static void win_read(grpc_endpoint *ep, grpc_slice_buffer *read_slices,
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info;
@@ -204,15 +203,14 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
- exec_ctx, cb,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "TCP socket is shutting down", &tcp->shutdown_error, 1));
+ cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "TCP socket is shutting down", &tcp->shutdown_error, 1));
return;
}
tcp->read_cb = cb;
tcp->read_slices = read_slices;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices);
+ grpc_slice_buffer_reset_and_unref_internal(read_slices);
tcp->read_slice = GRPC_SLICE_MALLOC(8192);
@@ -230,7 +228,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&tcp->on_read, GRPC_ERROR_NONE);
return;
}
@@ -243,17 +241,17 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read,
+ GRPC_CLOSURE_SCHED(&tcp->on_read,
GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
}
- grpc_socket_notify_on_read(exec_ctx, tcp->socket, &tcp->on_read);
+ grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
}
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
+static void on_write(void *tcpp, grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
@@ -274,13 +272,13 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
}
- TCP_UNREF(exec_ctx, tcp, "write");
- GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
+ TCP_UNREF(tcp, "write");
+ GRPC_CLOSURE_SCHED(cb, error);
}
/* Initiates a write. */
-static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- grpc_slice_buffer *slices, grpc_closure *cb) {
+static void win_write(grpc_endpoint *ep, grpc_slice_buffer *slices,
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info;
@@ -294,9 +292,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
- exec_ctx, cb,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "TCP socket is shutting down", &tcp->shutdown_error, 1));
+ cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "TCP socket is shutting down", &tcp->shutdown_error, 1));
return;
}
@@ -327,7 +324,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
- GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(cb, error);
if (allocated) gpr_free(allocated);
return;
}
@@ -344,27 +341,25 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
- TCP_UNREF(exec_ctx, tcp, "write");
- GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
+ TCP_UNREF(tcp, "write");
+ GRPC_CLOSURE_SCHED(cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}
/* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */
- grpc_socket_notify_on_write(exec_ctx, socket, &tcp->on_write);
+ grpc_socket_notify_on_write(socket, &tcp->on_write);
}
-static void win_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- grpc_pollset *ps) {
+static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) {
grpc_tcp *tcp;
(void)ps;
tcp = (grpc_tcp *)ep;
grpc_iocp_add_socket(tcp->socket);
}
-static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- grpc_pollset_set *pss) {
+static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) {
grpc_tcp *tcp;
(void)pss;
tcp = (grpc_tcp *)ep;
@@ -377,8 +372,7 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
we're not going to protect against these. However the IO Completion Port
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
-static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- grpc_error *why) {
+static void win_shutdown(grpc_endpoint *ep, grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
@@ -391,13 +385,13 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
- grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
+ grpc_resource_user_shutdown(tcp->resource_user);
}
-static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void win_destroy(grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
- TCP_UNREF(exec_ctx, tcp, "destroy");
+ TCP_UNREF(tcp, "destroy");
}
static char *win_get_peer(grpc_endpoint *ep) {
@@ -417,14 +411,14 @@ static grpc_endpoint_vtable vtable = {
win_shutdown, win_destroy, win_get_resource_user, win_get_peer,
win_get_fd};
-grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
+grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_channel_args *channel_args,
const char *peer_string) {
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
(grpc_resource_quota *)channel_args->args[i].value.pointer.p);
}