From a41ac571ab5a7a387683d29e793be3902f92aff5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 17 May 2016 16:08:17 -0700 Subject: Windows port of error system --- src/core/lib/iomgr/tcp_windows.c | 82 +++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 48 deletions(-) (limited to 'src/core/lib/iomgr/tcp_windows.c') diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 551149e1a6..f46b353f5a 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -61,27 +61,30 @@ #define GRPC_FIONBIO FIONBIO #endif -static int set_non_block(SOCKET sock) { +static grpc_error * set_non_block(SOCKET sock) { int status; uint32_t param = 1; DWORD ret; status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret, NULL, NULL); - return status == 0; + return status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)"); } -static int set_dualstack(SOCKET sock) { +static grpc_error * set_dualstack(SOCKET sock) { int status; unsigned long param = 0; status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char *)¶m, sizeof(param)); - return status == 0; + return status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)"); } -int grpc_tcp_prepare_socket(SOCKET sock) { - if (!set_non_block(sock)) return 0; - if (!set_dualstack(sock)) return 0; - return 1; +grpc_error * grpc_tcp_prepare_socket(SOCKET sock) { + grpc_error * err; + err = set_non_block(sock); + if (err != GRPC_ERROR_NONE) return err; + err = set_dualstack(sock); + if (err != GRPC_ERROR_NONE) return err; + return GRPC_ERROR_NONE; } typedef struct grpc_tcp { @@ -148,39 +151,36 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = tcpp; grpc_closure *cb = tcp->read_cb; grpc_winsocket *socket = tcp->socket; gpr_slice sub; grpc_winsocket_callback_info *info = &socket->read_info; - if (success) { + GRPC_ERROR_REF(error); + + if (error == GRPC_ERROR_NONE) { if (info->wsa_error != 0 && !tcp->shutting_down) { - if (info->wsa_error != WSAECONNRESET) { - char *utf8_message = gpr_format_message(info->wsa_error); - gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); - gpr_free(utf8_message); - } - success = 0; + char *utf8_message = gpr_format_message(info->wsa_error); + gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); + error = GRPC_ERROR_CREATE(utf8_message); + gpr_free(utf8_message); gpr_slice_unref(tcp->read_slice); } else { if (info->bytes_transfered != 0 && !tcp->shutting_down) { sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); gpr_slice_buffer_add(tcp->read_slices, sub); - success = 1; } else { gpr_slice_unref(tcp->read_slice); - success = 0; + error = GRPC_ERROR_CREATE("End of TCP stream"); } } } tcp->read_cb = NULL; TCP_UNREF(tcp, "read"); - if (cb) { - cb->cb(exec_ctx, cb->cb_arg, success); - } + grpc_exec_ctx_push(exec_ctx, cb, error, NULL); } static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -194,7 +194,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL); + grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -218,7 +218,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transfered = bytes_read; - grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, true, NULL); + grpc_exec_ctx_push(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL); return; } @@ -231,7 +231,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, false, NULL); + grpc_exec_ctx_push(exec_ctx, &tcp->on_read, GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } } @@ -240,32 +240,29 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) { +static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; grpc_closure *cb; + GRPC_ERROR_REF(error); + gpr_mu_lock(&tcp->mu); cb = tcp->write_cb; tcp->write_cb = NULL; gpr_mu_unlock(&tcp->mu); - if (success) { + if (error == GRPC_ERROR_NONE) { if (info->wsa_error != 0) { - if (info->wsa_error != WSAECONNRESET) { - char *utf8_message = gpr_format_message(info->wsa_error); - gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); - gpr_free(utf8_message); - } - success = 0; + error = GRPC_WSA_ERROR(info->wsa_error, "WSASend"); } else { GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length); } } TCP_UNREF(tcp, "write"); - cb->cb(exec_ctx, cb->cb_arg, success); + grpc_exec_ctx_push(exec_ctx, cb, error, NULL); } /* Initiates a write. */ @@ -283,7 +280,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL); + grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -311,19 +308,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, connection that has its send queue filled up. But if we don't, then we can avoid doing an async write operation at all. */ if (info->wsa_error != WSAEWOULDBLOCK) { - bool ok = false; - if (status == 0) { - ok = true; - GPR_ASSERT(bytes_sent == tcp->write_slices->length); - } else { - if (info->wsa_error != WSAECONNRESET) { - char *utf8_message = gpr_format_message(info->wsa_error); - gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message); - gpr_free(utf8_message); - } - } - if (allocated) gpr_free(allocated); - grpc_exec_ctx_enqueue(exec_ctx, cb, ok, NULL); + grpc_error *error = status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); + grpc_exec_ctx_push(exec_ctx, cb, error, NULL); return; } @@ -340,7 +326,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); - grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL); + grpc_exec_ctx_push(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), NULL); return; } } -- cgit v1.2.3 From 3e149f3149cffab63bfe5a743ad43a00a45eb321 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 17 May 2016 16:11:04 -0700 Subject: clang-format --- src/core/ext/transport/chttp2/transport/parsing.c | 4 +-- src/core/lib/iomgr/error.c | 12 ++++----- src/core/lib/iomgr/error.h | 2 +- src/core/lib/iomgr/pollset_windows.c | 10 ++++--- src/core/lib/iomgr/resolve_address_windows.c | 27 ++++++++++++------- src/core/lib/iomgr/tcp_client_windows.c | 8 +++--- src/core/lib/iomgr/tcp_server_windows.c | 26 +++++++++++------- src/core/lib/iomgr/tcp_windows.c | 32 +++++++++++++++-------- src/core/lib/iomgr/unix_sockets_posix_noop.c | 3 ++- 9 files changed, 78 insertions(+), 46 deletions(-) (limited to 'src/core/lib/iomgr/tcp_windows.c') diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 32af8a6bb5..72b3131d7b 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -87,8 +87,8 @@ void grpc_chttp2_prepare_to_read( transport_global->settings[GRPC_SENT_SETTINGS], sizeof(transport_parsing->last_sent_settings)); transport_parsing->max_frame_size = - transport_global - ->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; + transport_global->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; /* update the parsing view of incoming window */ while (grpc_chttp2_list_pop_unannounced_incoming_window_available( diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 1cb4f48e60..05488af4bb 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -508,14 +508,14 @@ grpc_error *grpc_os_error(const char *file, int line, int err, #ifdef GPR_WIN32 grpc_error *grpc_wsa_error(const char *file, int line, int err, - const char *call_name) { + const char *call_name) { char *utf8_message = gpr_format_message(err); grpc_error *error = grpc_error_set_str( - grpc_error_set_str( - grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0), - GRPC_ERROR_INT_WSA_ERROR, err), - GRPC_ERROR_STR_OS_ERROR, utf8_message), - GRPC_ERROR_STR_SYSCALL, call_name); + grpc_error_set_str( + grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0), + GRPC_ERROR_INT_WSA_ERROR, err), + GRPC_ERROR_STR_OS_ERROR, utf8_message), + GRPC_ERROR_STR_SYSCALL, call_name); gpr_free(utf8_message); return error; } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 64849ae011..bf275fa408 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -130,7 +130,7 @@ grpc_error *grpc_os_error(const char *file, int line, int err, #define GRPC_OS_ERROR(err, call_name) \ grpc_os_error(__FILE__, __LINE__, err, call_name) grpc_error *grpc_wsa_error(const char *file, int line, int err, - const char *call_name); + const char *call_name); #define GRPC_WSA_ERROR(err, call_name) \ grpc_wsa_error(__FILE__, __LINE__, err, call_name) diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index eecb4ac3ad..5882d8d71d 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -128,8 +128,8 @@ void grpc_pollset_reset(grpc_pollset *pollset) { } grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, gpr_timespec now, - gpr_timespec deadline) { + grpc_pollset_worker **worker_hdl, + gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; *worker_hdl = &worker; @@ -167,7 +167,8 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (pollset->shutting_down && pollset->on_shutdown != NULL) { - grpc_exec_ctx_push(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_push(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, + NULL); pollset->on_shutdown = NULL; } goto done; @@ -200,7 +201,8 @@ done: return GRPC_ERROR_NONE; } -grpc_error *grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { +grpc_error *grpc_pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { for (specific_worker = diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 92bd30ca93..66142da385 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -62,7 +62,8 @@ typedef struct { } request; static grpc_error *blocking_resolve_address_impl( - const char *name, const char *default_port, grpc_resolved_addresses **addresses) { + const char *name, const char *default_port, + grpc_resolved_addresses **addresses) { struct addrinfo hints; struct addrinfo *result = NULL, *resp; char *host; @@ -113,7 +114,8 @@ static grpc_error *blocking_resolve_address_impl( for (resp = result; resp != NULL; resp = resp->ai_next) { (*addresses)->naddrs++; } - (*addresses)->addrs = gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs); + (*addresses)->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs); i = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); @@ -124,8 +126,8 @@ static grpc_error *blocking_resolve_address_impl( { for (i = 0; i < (*addresses)->naddrs; i++) { char *buf; - grpc_sockaddr_to_string(&buf, (struct sockaddr *)&(*addresses)->addrs[i].addr, - 0); + grpc_sockaddr_to_string( + &buf, (struct sockaddr *)&(*addresses)->addrs[i].addr, 0); gpr_free(buf); } } @@ -140,14 +142,17 @@ done: } grpc_error *(*grpc_blocking_resolve_address)( - const char *name, const char *default_port, grpc_resolved_addresses **addresses) = blocking_resolve_address_impl; + const char *name, const char *default_port, + grpc_resolved_addresses **addresses) = blocking_resolve_address_impl; /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ -static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, grpc_error *error) { +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, + grpc_error *error) { request *r = rp; if (error == GRPC_ERROR_NONE) { - error = grpc_blocking_resolve_address(r->name, r->default_port, r->addresses); + error = + grpc_blocking_resolve_address(r->name, r->default_port, r->addresses); } else { GRPC_ERROR_REF(error); } @@ -163,7 +168,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { } static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, - const char *default_port, grpc_closure *on_done, grpc_resolved_addresses **addresses) { + const char *default_port, + grpc_closure *on_done, + grpc_resolved_addresses **addresses) { request *r = gpr_malloc(sizeof(request)); grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); @@ -174,6 +181,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, } void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name, - const char *default_port, grpc_closure *on_done, grpc_resolved_addresses **addresses) = resolve_address_impl; + const char *default_port, grpc_closure *on_done, + grpc_resolved_addresses **addresses) = + resolve_address_impl; #endif diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 2d28eee6f6..a645e83b4b 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -170,7 +170,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL); if (status != 0) { - error = GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)"); + error = GRPC_WSA_ERROR(WSAGetLastError(), + "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)"); goto failure; } @@ -214,8 +215,9 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, failure: GPR_ASSERT(error != GRPC_ERROR_NONE); char *target_uri = grpc_sockaddr_to_uri(addr); - grpc_error *final_error = grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING("Failed to connect", &error, 1), - GRPC_ERROR_STR_TARGET_ADDRESS, target_uri); + grpc_error *final_error = grpc_error_set_str( + GRPC_ERROR_CREATE_REFERENCING("Failed to connect", &error, 1), + GRPC_ERROR_STR_TARGET_ADDRESS, target_uri); GRPC_ERROR_UNREF(error); if (socket != NULL) { grpc_winsocket_destroy(socket); diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index a98fedc812..cb7240e0b6 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -102,7 +102,8 @@ struct grpc_tcp_server { /* Public function. Allocates the proper data structures to hold a grpc_tcp_server. */ -grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, grpc_tcp_server **server) { +grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, + grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); @@ -144,7 +145,8 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, grpc_closure *shutdown_starting) { gpr_mu_lock(&s->mu); - grpc_closure_list_append(&s->shutdown_starting, shutdown_starting, GRPC_ERROR_NONE); + grpc_closure_list_append(&s->shutdown_starting, shutdown_starting, + GRPC_ERROR_NONE); gpr_mu_unlock(&s->mu); } @@ -189,7 +191,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* Prepare (bind) a recently-created socket for listening. */ static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr, - size_t addr_len, int *port) { + size_t addr_len, int *port) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; grpc_error *error = GRPC_ERROR_NONE; @@ -222,7 +224,11 @@ static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr, failure: GPR_ASSERT(error != GRPC_ERROR_NONE); char *tgtaddr = grpc_sockaddr_to_uri(addr); - grpc_error *final_error = grpc_error_set_int( grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING("Failed to prepare server socket", &error, 1), GRPC_ERROR_STR_TARGET_ADDRESS, tgtaddr), GRPC_ERROR_INT_FD, (intptr_t)sock); + grpc_error *final_error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING( + "Failed to prepare server socket", &error, 1), + GRPC_ERROR_STR_TARGET_ADDRESS, tgtaddr), + GRPC_ERROR_INT_FD, (intptr_t)sock); gpr_free(tgtaddr); GRPC_ERROR_UNREF(error); if (sock != INVALID_SOCKET) closesocket(sock); @@ -246,7 +252,8 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, /* In order to do an async accept, we need to create a socket first which will be the one assigned to the new incoming connection. */ -static grpc_error *start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) { +static grpc_error *start_accept(grpc_exec_ctx *exec_ctx, + grpc_tcp_listener *port) { SOCKET sock = INVALID_SOCKET; BOOL success; DWORD addrlen = sizeof(struct sockaddr_in6) + 16; @@ -384,9 +391,9 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, - const struct sockaddr *addr, - size_t addr_len, - unsigned port_index, grpc_tcp_listener **listener) { + const struct sockaddr *addr, + size_t addr_len, unsigned port_index, + grpc_tcp_listener **listener) { grpc_tcp_listener *sp = NULL; int port; int status; @@ -501,7 +508,8 @@ done: gpr_free(allocated_addr); if (error != GRPC_ERROR_NONE) { - grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", &error, 1); + grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING( + "Failed to add port to server", &error, 1); GRPC_ERROR_UNREF(error); error = error_out; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index f46b353f5a..e9a105f3e3 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -61,25 +61,29 @@ #define GRPC_FIONBIO FIONBIO #endif -static grpc_error * set_non_block(SOCKET sock) { +static grpc_error *set_non_block(SOCKET sock) { int status; uint32_t param = 1; DWORD ret; status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret, NULL, NULL); - return status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)"); + return status == 0 + ? GRPC_ERROR_NONE + : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)"); } -static grpc_error * set_dualstack(SOCKET sock) { +static grpc_error *set_dualstack(SOCKET sock) { int status; unsigned long param = 0; status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char *)¶m, sizeof(param)); - return status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)"); + return status == 0 + ? GRPC_ERROR_NONE + : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)"); } -grpc_error * grpc_tcp_prepare_socket(SOCKET sock) { - grpc_error * err; +grpc_error *grpc_tcp_prepare_socket(SOCKET sock) { + grpc_error *err; err = set_non_block(sock); if (err != GRPC_ERROR_NONE) return err; err = set_dualstack(sock); @@ -194,7 +198,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + grpc_exec_ctx_push(exec_ctx, cb, + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -231,7 +236,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - grpc_exec_ctx_push(exec_ctx, &tcp->on_read, GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); + grpc_exec_ctx_push(exec_ctx, &tcp->on_read, + GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } } @@ -280,7 +286,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + grpc_exec_ctx_push(exec_ctx, cb, + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -308,7 +315,9 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, connection that has its send queue filled up. But if we don't, then we can avoid doing an async write operation at all. */ if (info->wsa_error != WSAEWOULDBLOCK) { - grpc_error *error = status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); + grpc_error *error = status == 0 + ? GRPC_ERROR_NONE + : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); grpc_exec_ctx_push(exec_ctx, cb, error, NULL); return; } @@ -326,7 +335,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), NULL); + grpc_exec_ctx_push(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), + NULL); return; } } diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.c index 250334b4af..56b47c3daf 100644 --- a/src/core/lib/iomgr/unix_sockets_posix_noop.c +++ b/src/core/lib/iomgr/unix_sockets_posix_noop.c @@ -44,7 +44,8 @@ void grpc_create_socketpair_if_unix(int sv[2]) { GPR_ASSERT(0); } -grpc_error *grpc_resolve_unix_domain_address(const char *name, grpc_resolved_addresses **addresses) { +grpc_error *grpc_resolve_unix_domain_address( + const char *name, grpc_resolved_addresses **addresses) { *addresses = NULL; return GRPC_ERROR_CREATE("Unix domain sockets are not supported on Windows"); } -- cgit v1.2.3 From 5352ff11d5bc1737bedf4344fb4dacdf8109ec7c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 18 May 2016 10:44:38 -0700 Subject: Windows fixes for error handling --- src/core/lib/iomgr/iocp_windows.c | 41 +---------------- src/core/lib/iomgr/iocp_windows.h | 8 ---- src/core/lib/iomgr/socket_windows.c | 58 +++++++++++++++++++++++- src/core/lib/iomgr/socket_windows.h | 11 +++++ src/core/lib/iomgr/tcp_client_windows.c | 12 ++--- src/core/lib/iomgr/tcp_server_windows.c | 6 ++- src/core/lib/iomgr/tcp_windows.c | 1 - test/core/surface/concurrent_connectivity_test.c | 1 + 8 files changed, 81 insertions(+), 57 deletions(-) (limited to 'src/core/lib/iomgr/tcp_windows.c') diff --git a/src/core/lib/iomgr/iocp_windows.c b/src/core/lib/iomgr/iocp_windows.c index 47bafd9e85..1faa8eeedf 100644 --- a/src/core/lib/iomgr/iocp_windows.c +++ b/src/core/lib/iomgr/iocp_windows.c @@ -104,7 +104,6 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, } else if (overlapped == &socket->read_info.overlapped) { info = &socket->read_info; } else { - gpr_log(GPR_ERROR, "Unknown IOCP operation"); abort(); } success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes, @@ -112,16 +111,7 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, info->bytes_transfered = bytes; info->wsa_error = success ? 0 : WSAGetLastError(); GPR_ASSERT(overlapped == &info->overlapped); - GPR_ASSERT(!info->has_pending_iocp); - gpr_mu_lock(&socket->state_mu); - if (info->closure) { - closure = info->closure; - info->closure = NULL; - } else { - info->has_pending_iocp = 1; - } - gpr_mu_unlock(&socket->state_mu); - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_socket_become_ready(exec_ctx, socket, info); return GRPC_IOCP_WORK_WORK; } @@ -176,33 +166,4 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) { GPR_ASSERT(ret == g_iocp); } -/* Calling notify_on_read or write means either of two things: - -) The IOCP already completed in the background, and we need to call - the callback now. - -) The IOCP hasn't completed yet, and we're queuing it for later. */ -static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx, - grpc_winsocket *socket, grpc_closure *closure, - grpc_winsocket_callback_info *info) { - GPR_ASSERT(info->closure == NULL); - gpr_mu_lock(&socket->state_mu); - if (info->has_pending_iocp) { - info->has_pending_iocp = 0; - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); - } else { - info->closure = closure; - } - gpr_mu_unlock(&socket->state_mu); -} - -void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx, - grpc_winsocket *socket, - grpc_closure *closure) { - socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info); -} - -void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, - grpc_closure *closure) { - socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info); -} - #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/iocp_windows.h b/src/core/lib/iomgr/iocp_windows.h index ae210fa7d7..011fee38ff 100644 --- a/src/core/lib/iomgr/iocp_windows.h +++ b/src/core/lib/iomgr/iocp_windows.h @@ -52,12 +52,4 @@ void grpc_iocp_flush(void); void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket *); -void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx, - grpc_winsocket *winsocket, - grpc_closure *closure); - -void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, - grpc_winsocket *winsocket, - grpc_closure *closure); - #endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */ diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index ebd77e0372..062226285e 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -91,10 +91,66 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) { closesocket(winsocket->socket); } -void grpc_winsocket_destroy(grpc_winsocket *winsocket) { +static void destroy(grpc_winsocket *winsocket) { grpc_iomgr_unregister_object(&winsocket->iomgr_object); gpr_mu_destroy(&winsocket->state_mu); gpr_free(winsocket); } +static bool check_destroyable(grpc_winsocket *winsocket) { + return winsocket->destroy_called == true && winsocket->write_info.closure == NULL && winsocket->read_info.closure == NULL; +} + +void grpc_winsocket_destroy(grpc_winsocket *winsocket) { + gpr_mu_lock(&winsocket->state_mu); + GPR_ASSERT(!winsocket->destroy_called); + winsocket->destroy_called = true; + bool should_destroy = check_destroyable(winsocket); + gpr_mu_unlock(&winsocket->state_mu); + if (should_destroy) destroy(winsocket); +} + +/* Calling notify_on_read or write means either of two things: +-) The IOCP already completed in the background, and we need to call +the callback now. +-) The IOCP hasn't completed yet, and we're queuing it for later. */ +static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx, + grpc_winsocket *socket, grpc_closure *closure, + grpc_winsocket_callback_info *info) { + GPR_ASSERT(info->closure == NULL); + gpr_mu_lock(&socket->state_mu); + if (info->has_pending_iocp) { + info->has_pending_iocp = 0; + grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + } else { + info->closure = closure; + } + gpr_mu_unlock(&socket->state_mu); +} + +void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx, + grpc_winsocket *socket, + grpc_closure *closure) { + socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info); +} + +void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, + grpc_closure *closure) { + socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info); +} + +void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_winsocket_callback_info *info) { + GPR_ASSERT(!info->has_pending_iocp); + gpr_mu_lock(&socket->state_mu); + if (info->closure) { + grpc_exec_ctx_push(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL); + info->closure = NULL; + } else { + info->has_pending_iocp = 1; + } + bool should_destroy = check_destroyable(socket); + gpr_mu_unlock(&socket->state_mu); + if (should_destroy) destroy(socket); +} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h index 73c4384987..562f507037 100644 --- a/src/core/lib/iomgr/socket_windows.h +++ b/src/core/lib/iomgr/socket_windows.h @@ -81,6 +81,7 @@ typedef struct grpc_winsocket_callback_info { is closer to what happens in posix world. */ typedef struct grpc_winsocket { SOCKET socket; + bool destroy_called; grpc_winsocket_callback_info write_info; grpc_winsocket_callback_info read_info; @@ -108,4 +109,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket); /* Destroy a socket. Should only be called if there's no pending operation. */ void grpc_winsocket_destroy(grpc_winsocket *socket); +void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx, + grpc_winsocket *winsocket, + grpc_closure *closure); + +void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, + grpc_winsocket *winsocket, + grpc_closure *closure); + +void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *winsocket, grpc_winsocket_callback_info *ci); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index a645e83b4b..52196fbcc9 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -78,18 +78,18 @@ static void async_connect_unlock_and_cleanup(async_connect *ac, static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); - if (ac->socket != NULL) { - grpc_winsocket_shutdown(ac->socket); + grpc_winsocket *socket = ac->socket; + ac->socket = NULL; + if (socket != NULL) { + grpc_winsocket_shutdown(socket); } - async_connect_unlock_and_cleanup(ac, ac->socket); + async_connect_unlock_and_cleanup(ac, socket); } static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect *ac = acp; - SOCKET sock = ac->socket->socket; grpc_endpoint **ep = ac->endpoint; GPR_ASSERT(*ep == NULL); - grpc_winsocket_callback_info *info = &ac->socket->write_info; grpc_closure *on_done = ac->on_done; GRPC_ERROR_REF(error); @@ -106,7 +106,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (error == GRPC_ERROR_NONE && socket != NULL) { DWORD transfered_bytes = 0; DWORD flags; - BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, + BOOL wsa_success = WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped, &transfered_bytes, FALSE, &flags); GPR_ASSERT(transfered_bytes == 0); if (!wsa_success) { diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index cb7240e0b6..52db74e069 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -449,7 +449,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, size_t addr_len, int *port) { - grpc_tcp_listener *sp; + grpc_tcp_listener *sp = NULL; SOCKET sock; struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in6 wildcard; @@ -512,6 +512,10 @@ done: "Failed to add port to server", &error, 1); GRPC_ERROR_UNREF(error); error = error_out; + *port = -1; + } else { + GPR_ASSERT(sp != NULL); + *port = sp->port; } return error; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index e9a105f3e3..88c9354f6e 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -167,7 +167,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { if (error == GRPC_ERROR_NONE) { if (info->wsa_error != 0 && !tcp->shutting_down) { char *utf8_message = gpr_format_message(info->wsa_error); - gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); error = GRPC_ERROR_CREATE(utf8_message); gpr_free(utf8_message); gpr_slice_unref(tcp->read_slice); diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index 917c4758a6..74e3183c26 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -118,6 +118,7 @@ void bad_server_thread(void *vargs) { addr.ss_family = AF_INET; error = grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len, &port); + GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error)); GPR_ASSERT(port > 0); gpr_asprintf(&args->addr, "localhost:%d", port); -- cgit v1.2.3 From 332f1b35d534f6910093729e9ecc2cacf8bb9688 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 13:21:21 -0700 Subject: Rename functions --- src/core/ext/client_config/client_channel.c | 20 ++++++------ src/core/ext/client_config/subchannel.c | 2 +- .../ext/client_config/subchannel_call_holder.c | 4 +-- src/core/ext/lb_policy/pick_first/pick_first.c | 26 ++++++++-------- src/core/ext/lb_policy/round_robin/round_robin.c | 12 ++++---- src/core/ext/resolver/dns/native/dns_resolver.c | 4 +-- src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 4 +-- .../chttp2/server/insecure/server_chttp2.c | 2 +- .../transport/chttp2/transport/chttp2_transport.c | 36 +++++++++++----------- src/core/ext/transport/chttp2/transport/writing.c | 2 +- .../transport/cronet/transport/cronet_transport.c | 4 +-- src/core/lib/http/httpcli.c | 2 +- src/core/lib/iomgr/ev_poll_posix.c | 10 +++--- src/core/lib/iomgr/exec_ctx.c | 2 +- src/core/lib/iomgr/exec_ctx.h | 6 ++-- src/core/lib/iomgr/pollset_windows.c | 4 +-- src/core/lib/iomgr/resolve_address_posix.c | 2 +- src/core/lib/iomgr/resolve_address_windows.c | 2 +- src/core/lib/iomgr/socket_windows.c | 4 +-- src/core/lib/iomgr/tcp_client_posix.c | 10 +++--- src/core/lib/iomgr/tcp_client_windows.c | 4 +-- src/core/lib/iomgr/tcp_posix.c | 8 ++--- src/core/lib/iomgr/tcp_server_posix.c | 2 +- src/core/lib/iomgr/tcp_server_windows.c | 2 +- src/core/lib/iomgr/tcp_windows.c | 16 +++++----- src/core/lib/iomgr/timer.c | 8 ++--- src/core/lib/iomgr/workqueue.h | 4 +-- src/core/lib/iomgr/workqueue_posix.c | 2 +- src/core/lib/security/transport/secure_endpoint.c | 4 +-- .../lib/security/transport/server_auth_filter.c | 6 ++-- src/core/lib/surface/call.c | 6 ++-- src/core/lib/surface/lame_client.c | 6 ++-- src/core/lib/surface/server.c | 22 ++++++------- src/core/lib/transport/connectivity_state.c | 10 +++--- src/core/lib/transport/transport.c | 8 ++--- test/core/end2end/fuzzers/api_fuzzer.c | 12 ++++---- test/core/end2end/tests/filter_causes_close.c | 2 +- test/core/internal_api_canaries/iomgr.c | 2 +- test/core/iomgr/workqueue_test.c | 4 +-- test/core/security/credentials_test.c | 12 ++++---- test/core/security/jwt_verifier_test.c | 10 +++--- test/core/util/mock_endpoint.c | 8 ++--- test/core/util/passthru_endpoint.c | 12 ++++---- 43 files changed, 164 insertions(+), 164 deletions(-) (limited to 'src/core/lib/iomgr/tcp_windows.c') diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index d00b966812..7da998e5e2 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -277,7 +277,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -296,9 +296,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_push(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("Ping with no load balancing"), - NULL); + grpc_exec_ctx_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("Ping with no load balancing"), + NULL); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -354,11 +354,11 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready)) { - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); } @@ -387,8 +387,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE("Pick cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE("Pick cancelled"), NULL); } } gpr_mu_unlock(&chand->mu_config); @@ -423,8 +423,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_exec_ctx_push(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), - NULL); + grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), + NULL); } gpr_mu_unlock(&chand->mu_config); return 0; diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index dd61caea94..1e1b30c4c0 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -290,7 +290,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - grpc_exec_ctx_push(exec_ctx, grpc_closure_create(subchannel_destroy, c), + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index 07b7813482..14022e9095 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -222,8 +222,8 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_push(exec_ctx, grpc_closure_create(retry_ops, a), - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a), + GRPC_ERROR_NONE, NULL); } static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index b5c2c426de..deb3f519fa 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -121,7 +121,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { *pp->target = NULL; grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); pp = next; } @@ -140,8 +140,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE("Pick Cancelled"), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -166,8 +166,8 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, initial_metadata_flags_eq) { grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE("Pick Cancelled"), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -306,16 +306,16 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); gpr_atm_rel_store(&p->selected, (gpr_atm)selected); - grpc_exec_ctx_push(exec_ctx, - grpc_closure_create(destroy_subchannels, p), - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, + grpc_closure_create(destroy_subchannels, p), + GRPC_ERROR_NONE, NULL); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( @@ -368,8 +368,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, + NULL); gpr_free(pp); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, @@ -421,8 +421,8 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (selected) { grpc_connected_subchannel_ping(exec_ctx, selected, closure); } else { - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"), - NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"), + NULL); } } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 95305e9cdd..76d9d83c9b 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -239,7 +239,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE("Channel Shutdown"), NULL); gpr_free(pp); } @@ -267,7 +267,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -293,7 +293,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -412,7 +412,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -466,7 +466,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } @@ -521,7 +521,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel_ping(exec_ctx, target, closure); } else { gpr_mu_unlock(&p->mu); - grpc_exec_ctx_push(exec_ctx, closure, + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Round Robin not connected"), NULL); } } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index adb9c3af84..e35aaba815 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -111,7 +111,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { } if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_push(exec_ctx, r->next_completion, + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); r->next_completion = NULL; } @@ -227,7 +227,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; r->published_version = r->resolved_version; } diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 90da6a93a0..1f7cce2f43 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -92,7 +92,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -133,7 +133,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; *r->target_config = cfg; - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index bf4c817e2b..c06d3eb60a 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -75,7 +75,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_push(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 5264b9e1f8..be517154e6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -190,8 +190,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_exec_ctx_push(exec_ctx, ping->on_recv, - GRPC_ERROR_CREATE("Transport closed"), NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, + GRPC_ERROR_CREATE("Transport closed"), NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -643,7 +643,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, t->executor.writing_active = 1; REF_TRANSPORT(t, "writing"); prevent_endpoint_shutdown(t); - grpc_exec_ctx_push(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); } check_read_ops(exec_ctx, &t->global); @@ -907,7 +907,7 @@ void grpc_chttp2_complete_closure_step( stream_global->collecting_stats); stream_global->collecting_stats = NULL; } - grpc_exec_ctx_push(exec_ctx, closure, closure->error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL); } *pclosure = NULL; } @@ -1127,7 +1127,7 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_push(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -1160,7 +1160,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, return; } - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -1234,8 +1234,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_initial_metadata, stream_global->recv_initial_metadata); - grpc_exec_ctx_push(exec_ctx, stream_global->recv_initial_metadata_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { @@ -1248,13 +1248,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); GPR_ASSERT(*stream_global->recv_message != NULL); - grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } else if (stream_global->published_trailing_metadata) { *stream_global->recv_message = NULL; - grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } } @@ -1643,7 +1643,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_push(exec_ctx, &t->parsing_action, error, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL); } else { post_reading_action_locked(exec_ctx, t, s_unused, arg); } @@ -1870,10 +1870,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); - grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); } else if (bs->error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), - NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), + NULL); } else { bs->on_next = arg->on_complete; bs->next = arg->slice; @@ -1930,7 +1930,7 @@ static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; if (bs->on_next != NULL) { *bs->next = arg->slice; - grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { gpr_slice_buffer_add(&bs->slices, arg->slice); @@ -1968,7 +1968,7 @@ static void incoming_byte_stream_finished_failed_locked( grpc_chttp2_incoming_byte_stream *bs = a->bs; grpc_error *error = a->error; gpr_free(a); - grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); bs->on_next = NULL; GRPC_ERROR_UNREF(bs->error); bs->error = error; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index ca56debc84..ed6536b3c7 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -187,7 +187,7 @@ void grpc_chttp2_perform_writes( grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, &transport_writing->done_cb); } else { - grpc_exec_ctx_push(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index ea0f2bcba4..c5318f092e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -155,11 +155,11 @@ static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void enqueue_callbacks(grpc_closure *callback_list[]) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (callback_list[0]) { - grpc_exec_ctx_push(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); callback_list[0] = NULL; } if (callback_list[1]) { - grpc_exec_ctx_push(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); callback_list[1] = NULL; } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 4bda734bfa..bcd9c8c821 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -102,7 +102,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, req->pollset); - grpc_exec_ctx_push(exec_ctx, req->on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, req->on_done, error, NULL); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 52efb0d6b2..e5eb16c3be 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -373,7 +373,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { if (!fd->released) { close(fd->fd); } - grpc_exec_ctx_push(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); } static int fd_wrapped_fd(grpc_fd *fd) { @@ -438,8 +438,8 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_push(exec_ctx, closure, fd_shutdown_error(fd->shutdown), - NULL); + grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown), + NULL); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -462,7 +462,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_push(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); + grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); *st = CLOSURE_NOT_READY; return 1; } @@ -811,7 +811,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } pollset->fd_count = 0; - grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } static void work_combine_error(grpc_error **composite, grpc_error *error) { diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index fcf39ffd54..9652f826c1 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -82,7 +82,7 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { grpc_exec_ctx_flush(exec_ctx); } -void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, +void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null) { GPR_ASSERT(offload_target_or_null == NULL); diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 617f48f1b8..38f27d9b13 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -94,9 +94,9 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); /** Add a closure to be executed at the next flush/finish point */ -void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error, - grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error, + grpc_workqueue *offload_target_or_null); /** Returns true if we'd like to leave this execution context as soon as possible: useful for deciding whether to do something more or not depending on outside context */ diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 5882d8d71d..d9f0f05ba6 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -109,7 +109,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); } else { pollset->on_shutdown = closure; } @@ -167,7 +167,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (pollset->shutting_down && pollset->on_shutdown != NULL) { - grpc_exec_ctx_push(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, NULL); pollset->on_shutdown = NULL; } diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 9a320dc952..4e9f978584 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -163,7 +163,7 @@ typedef struct { static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, grpc_error *error) { request *r = rp; - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, r->on_done, grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out), NULL); diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 22a9feb72d..fac4d13a27 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -154,7 +154,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, } else { GRPC_ERROR_REF(error); } - grpc_exec_ctx_push(exec_ctx, r->on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, r->on_done, error, NULL); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r); diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index 8919452751..fbc1bb5301 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -123,7 +123,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&socket->state_mu); if (info->has_pending_iocp) { info->has_pending_iocp = 0; - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); } else { info->closure = closure; } @@ -146,7 +146,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, GPR_ASSERT(!info->has_pending_iocp); gpr_mu_lock(&socket->state_mu); if (info->closure) { - grpc_exec_ctx_push(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL); info->closure = NULL; } else { info->has_pending_iocp = 1; diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 9c4ca1617f..078696c126 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -221,7 +221,7 @@ finish: gpr_free(ac->addr_str); gpr_free(ac); } - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); } static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, @@ -250,7 +250,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } if (dsmode == GRPC_DSMODE_IPV4) { @@ -260,7 +260,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr_len = sizeof(addr4_copy); } if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } @@ -276,13 +276,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (err >= 0) { *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_push(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index dbf116422f..6134d4476b 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -121,7 +121,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect_unlock_and_cleanup(ac, socket); /* If the connection was aborted, the callback was already called when the deadline was met. */ - grpc_exec_ctx_push(exec_ctx, on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL); } /* Tries to issue one async connection, then schedules both an IOCP @@ -225,7 +225,7 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - grpc_exec_ctx_push(exec_ctx, on_done, final_error, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL); } #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 48a19bed84..2f7c50ce35 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -175,7 +175,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } #define MAX_READ_IOVEC 4 @@ -277,7 +277,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = false; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_push(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); } } @@ -410,7 +410,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); return; } tcp->outgoing_buffer = buf; @@ -422,7 +422,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } GPR_TIMER_END("tcp_write", 0); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index e5c6a3c549..6b10d3ebeb 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -154,7 +154,7 @@ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); } gpr_mu_destroy(&s->mu); diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 481111c9f9..b4f28a4c3b 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -121,7 +121,7 @@ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); } /* Now that the accepts have been aborted, we can destroy the sockets. diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 88c9354f6e..383af9f12b 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -183,7 +183,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { tcp->read_cb = NULL; TCP_UNREF(tcp, "read"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -197,7 +197,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_exec_ctx_push(exec_ctx, cb, + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -222,7 +222,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transfered = bytes_read; - grpc_exec_ctx_push(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL); return; } @@ -235,7 +235,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - grpc_exec_ctx_push(exec_ctx, &tcp->on_read, + grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } @@ -267,7 +267,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } TCP_UNREF(tcp, "write"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } /* Initiates a write. */ @@ -285,7 +285,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_exec_ctx_push(exec_ctx, cb, + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -317,7 +317,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *error = status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); return; } @@ -334,7 +334,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), NULL); return; } diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c index b46c6df287..7b0ef550a7 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer.c @@ -186,7 +186,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (!g_initialized) { timer->triggered = 1; - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, &timer->closure, GRPC_ERROR_CREATE("Attempt to create timer before initialization"), NULL); @@ -195,7 +195,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); return; } @@ -247,7 +247,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -313,7 +313,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 6e8b1218be..7b6402e3a4 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -78,7 +78,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error); +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 297e8d3102..d2e3b5acb3 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -137,7 +137,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } } -void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 23b51f33f6..072d468415 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -138,7 +138,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, } } ep->read_buffer = NULL; - grpc_exec_ctx_push(exec_ctx, ep->read_cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, ep->read_cb, error, NULL); SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } @@ -319,7 +319,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, cb, grpc_set_tsi_error_bits(GRPC_ERROR_CREATE("Wrap failed"), result), NULL); diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index d45ec4020c..acbce769fb 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -128,7 +128,7 @@ static void on_md_processing_done( grpc_metadata_batch_filter(calld->recv_initial_metadata, remove_consumed_md, elem); grpc_metadata_array_destroy(&calld->md); - grpc_exec_ctx_push(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL); } else { gpr_slice message; grpc_transport_stream_op close_op; @@ -146,7 +146,7 @@ static void on_md_processing_done( calld->transport_op.send_trailing_metadata = NULL; grpc_transport_stream_op_add_close(&close_op, status, &message); grpc_call_next_op(&exec_ctx, elem, &close_op); - grpc_exec_ctx_push(&exec_ctx, calld->on_done_recv, + grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, grpc_error_set_int(GRPC_ERROR_CREATE(error_details), GRPC_ERROR_INT_GRPC_STATUS, status), NULL); @@ -169,7 +169,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, return; } } - grpc_exec_ctx_push(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), + grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), NULL); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 3915ba57a2..09167efb5c 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -743,7 +743,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, cc->call = c; cc->status = status; GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_push(exec_ctx, &cc->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &cc->closure, GRPC_ERROR_NONE, NULL); return GRPC_CALL_OK; } @@ -970,7 +970,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ - grpc_exec_ctx_push(exec_ctx, bctl->notify_tag, bctl->error, NULL); + grpc_exec_ctx_sched(exec_ctx, bctl->notify_tag, bctl->error, NULL); gpr_mu_lock(&call->mu); bctl->call->used_batches = (uint8_t)(bctl->call->used_batches & @@ -1129,7 +1129,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, grpc_closure *saved_rsr_closure = grpc_closure_create( receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_exec_ctx_push(exec_ctx, saved_rsr_closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL); } gpr_mu_unlock(&call->mu); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 5c15b676f8..8a0dcdcc07 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -94,14 +94,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; - grpc_exec_ctx_push(exec_ctx, op->on_connectivity_state_change, + grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE, NULL); } if (op->on_consumed != NULL) { - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); } if (op->send_ping != NULL) { - grpc_exec_ctx_push(exec_ctx, op->send_ping, + grpc_exec_ctx_sched(exec_ctx, op->send_ping, GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index f261321750..eae70717a5 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -344,8 +344,8 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); } } @@ -526,7 +526,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL); return; } @@ -571,8 +571,8 @@ static void finish_start_new_rpc( calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); return; } @@ -757,8 +757,8 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); } - grpc_exec_ctx_push(exec_ctx, calld->on_done_recv_initial_metadata, error, - NULL); + grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv_initial_metadata, error, + NULL); } static void server_mutate_op(grpc_call_element *elem, @@ -794,8 +794,8 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -1339,8 +1339,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 3286af9fb2..f714e385de 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -79,7 +79,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } else { error = GRPC_ERROR_CREATE("Shutdown connectivity owner"); } - grpc_exec_ctx_push(exec_ctx, w->notify, error, NULL); + grpc_exec_ctx_sched(exec_ctx, w->notify, error, NULL); gpr_free(w); } GRPC_ERROR_UNREF(tracker->current_error); @@ -114,7 +114,7 @@ int grpc_connectivity_state_notify_on_state_change( if (current == NULL) { grpc_connectivity_state_watcher *w = tracker->watchers; if (w != NULL && w->notify == notify) { - grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); tracker->watchers = w->next; gpr_free(w); return 0; @@ -122,7 +122,7 @@ int grpc_connectivity_state_notify_on_state_change( while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { - grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); w->next = w->next->next; gpr_free(rm_candidate); return 0; @@ -133,7 +133,7 @@ int grpc_connectivity_state_notify_on_state_change( } else { if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_exec_ctx_push(exec_ctx, notify, + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(tracker->current_error), NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); @@ -179,7 +179,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; - grpc_exec_ctx_push(exec_ctx, w->notify, + grpc_exec_ctx_sched(exec_ctx, w->notify, GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index fdf0f4b2aa..10d58153af 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -60,7 +60,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { #endif if (gpr_unref(&refcount->refs)) { - grpc_exec_ctx_push(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL); } } @@ -146,11 +146,11 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error) { - grpc_exec_ctx_push(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), + grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), NULL); - grpc_exec_ctx_push(exec_ctx, op->recv_initial_metadata_ready, + grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready, GRPC_ERROR_REF(error), NULL); - grpc_exec_ctx_push(exec_ctx, op->on_complete, error, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index aa715c841c..d00a67ffc9 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -200,9 +200,9 @@ static void finish_resolve(grpc_exec_ctx *exec_ctx, void *arg, addrs->addrs = gpr_malloc(sizeof(*addrs->addrs)); addrs->addrs[0].len = 0; *r->addrs = addrs; - grpc_exec_ctx_push(exec_ctx, r->on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->on_done, GRPC_ERROR_NONE, NULL); } else { - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, r->on_done, GRPC_ERROR_CREATE_REFERENCING("Resolution failed", &error, 1), NULL); } @@ -247,7 +247,7 @@ static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { future_connect *fc = arg; if (error != GRPC_ERROR_NONE) { *fc->ep = NULL; - grpc_exec_ctx_push(exec_ctx, fc->closure, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, fc->closure, GRPC_ERROR_REF(error), NULL); } else if (g_server != NULL) { grpc_endpoint *client; grpc_endpoint *server; @@ -259,7 +259,7 @@ static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_server_setup_transport(exec_ctx, g_server, transport, NULL, NULL); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); - grpc_exec_ctx_push(exec_ctx, fc->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, fc->closure, GRPC_ERROR_NONE, NULL); } else { sched_connect(exec_ctx, fc->closure, fc->ep, fc->deadline); } @@ -270,8 +270,8 @@ static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, gpr_timespec deadline) { if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) < 0) { *ep = NULL; - grpc_exec_ctx_push(exec_ctx, closure, - GRPC_ERROR_CREATE("Connect deadline exceeded"), NULL); + grpc_exec_ctx_sched(exec_ctx, closure, + GRPC_ERROR_CREATE("Connect deadline exceeded"), NULL); return; } diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 8fd847b878..71c0dbfb58 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -216,7 +216,7 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg, &message); grpc_call_next_op(exec_ctx, elem, &op); } - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, calld->recv_im_ready, GRPC_ERROR_CREATE_REFERENCING("Forced call to close", &error, 1), NULL); } diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 71a1bb1436..5e86c42309 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -72,7 +72,7 @@ static void test_code(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); - grpc_exec_ctx_push(&exec_ctx, &closure, GRPC_ERROR_CREATE("Foo"), NULL); + grpc_exec_ctx_sched(&exec_ctx, &closure, GRPC_ERROR_CREATE("Foo"), NULL); grpc_exec_ctx_enqueue_list(&exec_ctx, &closure_list, NULL); /* endpoint.h */ diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index 9a359cd799..76ecfae74b 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -73,7 +73,7 @@ static void test_add_closure(void) { grpc_pollset_worker *worker = NULL; grpc_closure_init(&c, must_succeed, &done); - grpc_workqueue_push(&exec_ctx, wq, &c, GRPC_ERROR_NONE); + grpc_workqueue_enqueue(&exec_ctx, wq, &c, GRPC_ERROR_NONE); grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); gpr_mu_lock(g_mu); @@ -103,7 +103,7 @@ static void test_flush(void) { grpc_pollset_worker *worker = NULL; grpc_closure_init(&c, must_succeed, &done); - grpc_exec_ctx_push(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); grpc_workqueue_flush(&exec_ctx, wq); grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c index ec417b84dc..e703dbdeb6 100644 --- a/test/core/security/credentials_test.c +++ b/test/core/security/credentials_test.c @@ -560,7 +560,7 @@ static int compute_engine_httpcli_get_success_override( grpc_httpcli_response *response) { validate_compute_engine_http_request(request); *response = http_response(200, valid_oauth2_json_response); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -570,7 +570,7 @@ static int compute_engine_httpcli_get_failure_override( grpc_httpcli_response *response) { validate_compute_engine_http_request(request); *response = http_response(403, "Not Authorized."); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -663,7 +663,7 @@ static int refresh_token_httpcli_post_success( grpc_closure *on_done, grpc_httpcli_response *response) { validate_refresh_token_http_request(request, body, body_size); *response = http_response(200, valid_oauth2_json_response); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -673,7 +673,7 @@ static int refresh_token_httpcli_post_failure( grpc_closure *on_done, grpc_httpcli_response *response) { validate_refresh_token_http_request(request, body, body_size); *response = http_response(403, "Not Authorized."); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -915,7 +915,7 @@ static int default_creds_gce_detection_httpcli_get_success_override( response->hdrs = headers; GPR_ASSERT(strcmp(request->http.path, "/") == 0); GPR_ASSERT(strcmp(request->host, "metadata.google.internal") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -973,7 +973,7 @@ static int default_creds_gce_detection_httpcli_get_failure_override( GPR_ASSERT(strcmp(request->http.path, "/") == 0); GPR_ASSERT(strcmp(request->host, "metadata.google.internal") == 0); *response = http_response(200, ""); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } diff --git a/test/core/security/jwt_verifier_test.c b/test/core/security/jwt_verifier_test.c index 23b46958f4..36b331a777 100644 --- a/test/core/security/jwt_verifier_test.c +++ b/test/core/security/jwt_verifier_test.c @@ -294,7 +294,7 @@ static int httpcli_get_google_keys_for_email( "/robot/v1/metadata/x509/" "777-abaslkan11hlb6nmim3bpspl31ud@developer." "gserviceaccount.com") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -338,7 +338,7 @@ static int httpcli_get_custom_keys_for_email( GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "keys.bar.com") == 0); GPR_ASSERT(strcmp(request->http.path, "/jwk/foo@bar.com") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -372,7 +372,7 @@ static int httpcli_get_jwk_set(grpc_exec_ctx *exec_ctx, GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0); GPR_ASSERT(strcmp(request->http.path, "/oauth2/v3/certs") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -387,7 +387,7 @@ static int httpcli_get_openid_config(grpc_exec_ctx *exec_ctx, GPR_ASSERT(strcmp(request->http.path, GRPC_OPENID_CONFIG_URL_SUFFIX) == 0); grpc_httpcli_set_override(httpcli_get_jwk_set, httpcli_post_should_not_be_called); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -427,7 +427,7 @@ static int httpcli_get_bad_json(grpc_exec_ctx *exec_ctx, grpc_httpcli_response *response) { *response = http_response(200, gpr_strdup("{\"bad\": \"stuff\"}")); GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index deef68ef59..e3df5b1841 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -51,7 +51,7 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } else { m->on_read = cb; m->on_read_out = slices; @@ -65,7 +65,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); } - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -78,7 +78,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; gpr_mu_lock(&m->mu); if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); m->on_read = NULL; } @@ -116,7 +116,7 @@ void grpc_mock_endpoint_put_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->mu); if (m->on_read != NULL) { gpr_slice_buffer_add(m->on_read_out, slice); - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); m->on_read = NULL; } else { gpr_slice_buffer_add(&m->read_buffer, slice); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 93753be251..bf897d8f7e 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -59,11 +59,11 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, half *m = (half *)ep; gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), NULL); } else if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } else { m->on_read = cb; m->on_read_out = slices; @@ -87,7 +87,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (size_t i = 0; i < slices->count; i++) { gpr_slice_buffer_add(m->on_read_out, gpr_slice_ref(slices->slices[i])); } - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); m->on_read = NULL; } else { for (size_t i = 0; i < slices->count; i++) { @@ -95,7 +95,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } } gpr_mu_unlock(&m->parent->mu); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -109,13 +109,13 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { gpr_mu_lock(&m->parent->mu); m->parent->shutdown = true; if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), NULL); m->on_read = NULL; } m = other_half(m); if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), NULL); m->on_read = NULL; } -- cgit v1.2.3 From 77c983dc87dcf19b3e869247f90063ef4c10af3e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 13:23:14 -0700 Subject: clang-format --- src/core/ext/client_config/subchannel.c | 2 +- src/core/ext/lb_policy/round_robin/round_robin.c | 12 +++++++----- src/core/ext/resolver/dns/native/dns_resolver.c | 2 +- src/core/ext/transport/chttp2/transport/writing.c | 2 +- src/core/lib/iomgr/exec_ctx.c | 4 ++-- src/core/lib/iomgr/pollset_windows.c | 2 +- src/core/lib/iomgr/tcp_client_posix.c | 2 +- src/core/lib/iomgr/tcp_windows.c | 8 ++++---- src/core/lib/iomgr/workqueue_posix.c | 2 +- src/core/lib/security/transport/server_auth_filter.c | 8 ++++---- src/core/lib/surface/lame_client.c | 4 ++-- src/core/lib/transport/connectivity_state.c | 4 ++-- src/core/lib/transport/transport.c | 4 ++-- test/core/util/mock_endpoint.c | 2 +- test/core/util/passthru_endpoint.c | 6 +++--- 15 files changed, 33 insertions(+), 31 deletions(-) (limited to 'src/core/lib/iomgr/tcp_windows.c') diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 1e1b30c4c0..c6c7b7a3a0 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -291,7 +291,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), - GRPC_ERROR_NONE, NULL); + GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 76d9d83c9b..42356d8b0b 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -240,7 +240,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { p->pending_picks = pp->next; *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Channel Shutdown"), NULL); + GRPC_ERROR_CREATE("Channel Shutdown"), NULL); gpr_free(pp); } grpc_connectivity_state_set( @@ -267,7 +267,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, + NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -293,7 +294,8 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, + NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -467,7 +469,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->pending_picks = pp->next; *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); + NULL); gpr_free(pp); } } else { @@ -522,7 +524,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } else { gpr_mu_unlock(&p->mu); grpc_exec_ctx_sched(exec_ctx, closure, - GRPC_ERROR_CREATE("Round Robin not connected"), NULL); + GRPC_ERROR_CREATE("Round Robin not connected"), NULL); } } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index e35aaba815..d6e7c89ef8 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -112,7 +112,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { if (r->next_completion != NULL) { *r->target_config = NULL; grpc_exec_ctx_sched(exec_ctx, r->next_completion, - GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); + GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index ed6536b3c7..add7678182 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -188,7 +188,7 @@ void grpc_chttp2_perform_writes( &transport_writing->done_cb); } else { grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, - NULL); + NULL); } } diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 9652f826c1..c44aafcddf 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -83,8 +83,8 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { } void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error, - grpc_workqueue *offload_target_or_null) { + grpc_error *error, + grpc_workqueue *offload_target_or_null) { GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index d9f0f05ba6..626dd784b3 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -168,7 +168,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->shutting_down && pollset->on_shutdown != NULL) { grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, - NULL); + NULL); pollset->on_shutdown = NULL; } goto done; diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 078696c126..80c7a3f128 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -283,7 +283,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), - NULL); + NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 383af9f12b..b2003675be 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -198,7 +198,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->shutting_down) { grpc_exec_ctx_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -236,7 +236,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, - GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); + GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } } @@ -286,7 +286,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->shutting_down) { grpc_exec_ctx_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -335,7 +335,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), - NULL); + NULL); return; } } diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index d2e3b5acb3..45e0f6063b 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -138,7 +138,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error) { + grpc_closure *closure, grpc_error *error) { grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index acbce769fb..34120aba42 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -147,9 +147,9 @@ static void on_md_processing_done( grpc_transport_stream_op_add_close(&close_op, status, &message); grpc_call_next_op(&exec_ctx, elem, &close_op); grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, - grpc_error_set_int(GRPC_ERROR_CREATE(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status), - NULL); + grpc_error_set_int(GRPC_ERROR_CREATE(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status), + NULL); } grpc_exec_ctx_finish(&exec_ctx); @@ -170,7 +170,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, } } grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), - NULL); + NULL); } static void set_recv_ops_md_callbacks(grpc_call_element *elem, diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 8a0dcdcc07..6c94de555a 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -95,14 +95,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, - GRPC_ERROR_NONE, NULL); + GRPC_ERROR_NONE, NULL); } if (op->on_consumed != NULL) { grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); } if (op->send_ping != NULL) { grpc_exec_ctx_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("lame client channel"), NULL); + GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index f714e385de..062d0b3742 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -134,7 +134,7 @@ int grpc_connectivity_state_notify_on_state_change( if (tracker->current_state != *current) { *current = tracker->current_state; grpc_exec_ctx_sched(exec_ctx, notify, - GRPC_ERROR_REF(tracker->current_error), NULL); + GRPC_ERROR_REF(tracker->current_error), NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -180,7 +180,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, *w->current = tracker->current_state; tracker->watchers = w->next; grpc_exec_ctx_sched(exec_ctx, w->notify, - GRPC_ERROR_REF(tracker->current_error), NULL); + GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); } } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 10d58153af..ec6833e323 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -147,9 +147,9 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error) { grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), - NULL); + NULL); grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_REF(error), NULL); + GRPC_ERROR_REF(error), NULL); grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index e3df5b1841..ed9545e9df 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -79,7 +79,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { gpr_mu_lock(&m->mu); if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, - GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); + GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); m->on_read = NULL; } gpr_mu_unlock(&m->mu); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index bf897d8f7e..a39f3dd66e 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -60,7 +60,7 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), - NULL); + NULL); } else if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); @@ -110,13 +110,13 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { m->parent->shutdown = true; if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), - NULL); + NULL); m->on_read = NULL; } m = other_half(m); if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), - NULL); + NULL); m->on_read = NULL; } gpr_mu_unlock(&m->parent->mu); -- cgit v1.2.3