aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_windows.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:05:05 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:05:05 -0800
commitad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/iomgr/tcp_windows.cc
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/iomgr/tcp_windows.cc')
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc93
1 files changed, 50 insertions, 43 deletions
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 6d091b77bb..33868cdc7a 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -109,20 +109,21 @@ typedef struct grpc_tcp {
char* peer_string;
} grpc_tcp;
-static void tcp_free(grpc_tcp* tcp) {
+static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
- grpc_resource_user_unref(tcp->resource_user);
+ grpc_resource_user_unref(exec_ctx, tcp->resource_user);
if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
gpr_free(tcp);
}
#ifndef NDEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_UNREF(exec_ctx, tcp, reason) \
+ tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
- int line) {
+static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
+ const char* reason, const char* file, int line) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -130,7 +131,7 @@ static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
val - 1);
}
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(exec_ctx, tcp);
}
}
@@ -145,11 +146,11 @@ static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
gpr_ref(&tcp->refcount);
}
#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp* tcp) {
+static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(exec_ctx, tcp);
}
}
@@ -157,7 +158,7 @@ static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_read(void* tcpp, grpc_error* error) {
+static void on_read(grpc_exec_ctx* exec_ctx, void* tcpp, grpc_error* error) {
grpc_tcp* tcp = (grpc_tcp*)tcpp;
grpc_closure* cb = tcp->read_cb;
grpc_winsocket* socket = tcp->socket;
@@ -171,13 +172,13 @@ static void on_read(void* tcpp, grpc_error* error) {
char* utf8_message = gpr_format_message(info->wsa_error);
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
gpr_free(utf8_message);
- grpc_slice_unref_internal(tcp->read_slice);
+ grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
} else {
if (info->bytes_transfered != 0 && !tcp->shutting_down) {
sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
grpc_slice_buffer_add(tcp->read_slices, sub);
} else {
- grpc_slice_unref_internal(tcp->read_slice);
+ grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
error = tcp->shutting_down
? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP stream shutting down", &tcp->shutdown_error, 1)
@@ -187,12 +188,12 @@ static void on_read(void* tcpp, grpc_error* error) {
}
tcp->read_cb = NULL;
- TCP_UNREF(tcp, "read");
- GRPC_CLOSURE_SCHED(cb, error);
+ TCP_UNREF(exec_ctx, tcp, "read");
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
-static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
- grpc_closure* cb) {
+static void win_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice_buffer* read_slices, grpc_closure* cb) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* handle = tcp->socket;
grpc_winsocket_callback_info* info = &handle->read_info;
@@ -203,14 +204,15 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
- cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "TCP socket is shutting down", &tcp->shutdown_error, 1));
+ exec_ctx, cb,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "TCP socket is shutting down", &tcp->shutdown_error, 1));
return;
}
tcp->read_cb = cb;
tcp->read_slices = read_slices;
- grpc_slice_buffer_reset_and_unref_internal(read_slices);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices);
tcp->read_slice = GRPC_SLICE_MALLOC(8192);
@@ -228,7 +230,7 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- GRPC_CLOSURE_SCHED(&tcp->on_read, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
return;
}
@@ -241,17 +243,17 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- GRPC_CLOSURE_SCHED(&tcp->on_read,
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read,
GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
}
- grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
+ grpc_socket_notify_on_read(exec_ctx, tcp->socket, &tcp->on_read);
}
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_write(void* tcpp, grpc_error* error) {
+static void on_write(grpc_exec_ctx* exec_ctx, void* tcpp, grpc_error* error) {
grpc_tcp* tcp = (grpc_tcp*)tcpp;
grpc_winsocket* handle = tcp->socket;
grpc_winsocket_callback_info* info = &handle->write_info;
@@ -272,13 +274,13 @@ static void on_write(void* tcpp, grpc_error* error) {
}
}
- TCP_UNREF(tcp, "write");
- GRPC_CLOSURE_SCHED(cb, error);
+ TCP_UNREF(exec_ctx, tcp, "write");
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
/* Initiates a write. */
-static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void win_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* socket = tcp->socket;
grpc_winsocket_callback_info* info = &socket->write_info;
@@ -292,8 +294,9 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
- cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "TCP socket is shutting down", &tcp->shutdown_error, 1));
+ exec_ctx, cb,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "TCP socket is shutting down", &tcp->shutdown_error, 1));
return;
}
@@ -324,7 +327,7 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_error* error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
- GRPC_CLOSURE_SCHED(cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
if (allocated) gpr_free(allocated);
return;
}
@@ -341,32 +344,35 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
- TCP_UNREF(tcp, "write");
- GRPC_CLOSURE_SCHED(cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
+ TCP_UNREF(exec_ctx, tcp, "write");
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}
/* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */
- grpc_socket_notify_on_write(socket, &tcp->on_write);
+ grpc_socket_notify_on_write(exec_ctx, socket, &tcp->on_write);
}
-static void win_add_to_pollset(grpc_endpoint* ep, grpc_pollset* ps) {
+static void win_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_pollset* ps) {
grpc_tcp* tcp;
(void)ps;
tcp = (grpc_tcp*)ep;
grpc_iocp_add_socket(tcp->socket);
}
-static void win_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pss) {
+static void win_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_pollset_set* pss) {
grpc_tcp* tcp;
(void)pss;
tcp = (grpc_tcp*)ep;
grpc_iocp_add_socket(tcp->socket);
}
-static void win_delete_from_pollset_set(grpc_endpoint* ep,
+static void win_delete_from_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* ep,
grpc_pollset_set* pss) {}
/* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
@@ -375,7 +381,8 @@ static void win_delete_from_pollset_set(grpc_endpoint* ep,
we're not going to protect against these. However the IO Completion Port
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
-static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
+static void win_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_error* why) {
grpc_tcp* tcp = (grpc_tcp*)ep;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
@@ -388,13 +395,13 @@ static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
}
grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
- grpc_resource_user_shutdown(tcp->resource_user);
+ grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}
-static void win_destroy(grpc_endpoint* ep) {
+static void win_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = (grpc_tcp*)ep;
- TCP_UNREF(tcp, "destroy");
+ TCP_UNREF(exec_ctx, tcp, "destroy");
}
static char* win_get_peer(grpc_endpoint* ep) {
@@ -420,14 +427,14 @@ static grpc_endpoint_vtable vtable = {win_read,
win_get_peer,
win_get_fd};
-grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
+grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_winsocket* socket,
grpc_channel_args* channel_args,
const char* peer_string) {
grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_unref_internal(resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
(grpc_resource_quota*)channel_args->args[i].value.pointer.p);
}
@@ -445,7 +452,7 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
- grpc_resource_quota_unref_internal(resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
return &tcp->base;
}