diff options
author | Craig Tiller <ctiller@google.com> | 2016-11-30 13:41:53 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-11-30 13:41:53 -0800 |
commit | 36b3135929cf1561d35039fc9e04e038f5351ed7 (patch) | |
tree | b20b4f6ab4043fae9a170adb711b488be0bd1a59 /src/core | |
parent | 0f97958b64a792e551aa3bde84bb8f53b04de3b4 (diff) | |
parent | 369ddc524a6ca55afb8e8bd6743ed5624e1a94ce (diff) |
Merge branch 'slice_interning' into metadata_filter
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/client_channel/subchannel.c | 138 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 17 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 19 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 31 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 29 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.h | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.c | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.c | 30 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_windows.c | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_windows.h | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_windows.c | 26 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_windows.c | 86 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.c | 2 | ||||
-rw-r--r-- | src/core/lib/support/subprocess_posix.c | 7 | ||||
-rw-r--r-- | src/core/lib/transport/connectivity_state.c | 13 | ||||
-rw-r--r-- | src/core/lib/transport/connectivity_state.h | 5 |
18 files changed, 263 insertions, 164 deletions
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 063b79cece..d38f7985a5 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -120,9 +120,9 @@ struct grpc_subchannel { gpr_mu mu; /** have we seen a disconnection? */ - int disconnected; + bool disconnected; /** are we connecting */ - int connecting; + bool connecting; /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; @@ -133,7 +133,9 @@ struct grpc_subchannel { /** backoff state */ gpr_backoff backoff_state; /** do we have an active alarm? */ - int have_alarm; + bool have_alarm; + /** have we started the backoff loop */ + bool backoff_begun; /** our alarm */ grpc_timer alarm; }; @@ -265,7 +267,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_subchannel_index_unregister(exec_ctx, c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); - c->disconnected = 1; + c->disconnected = true; grpc_connector_shutdown(exec_ctx, c->connector); con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { @@ -371,7 +373,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return grpc_subchannel_index_register(exec_ctx, key, c); } -static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { +static void continue_connect_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; @@ -387,12 +390,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { &c->connected); } -static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - c->next_attempt = - gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); - continue_connect(exec_ctx, c); -} - grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, grpc_error **error) { grpc_connectivity_state state; @@ -419,6 +416,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, follow_up->cb(exec_ctx, follow_up->cb_arg, error); } +static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_subchannel *c = arg; + gpr_mu_lock(&c->mu); + c->have_alarm = false; + if (c->disconnected) { + error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); + } else { + GRPC_ERROR_REF(error); + } + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); + c->next_attempt = + gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); + continue_connect_locked(exec_ctx, c); + gpr_mu_unlock(&c->mu); + } else { + gpr_mu_unlock(&c->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + } + GRPC_ERROR_UNREF(error); +} + +static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { + if (c->disconnected) { + /* Don't try to connect if we're already disconnected */ + return; + } + + if (c->connecting) { + /* Already connecting: don't restart */ + return; + } + + if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) { + /* Already connected: don't restart */ + return; + } + + if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) { + /* Nobody is interested in connecting: so don't just yet */ + return; + } + + c->connecting = true; + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!c->backoff_begun) { + c->backoff_begun = true; + c->next_attempt = gpr_backoff_begin(&c->backoff_state, now); + continue_connect_locked(exec_ctx, c); + } else { + GPR_ASSERT(!c->have_alarm); + c->have_alarm = true; + gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); + if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= + 0) { + gpr_log(GPR_INFO, "Retry immediately"); + } else { + gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", + time_til_next.tv_sec, time_til_next.tv_nsec); + } + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + } +} + void grpc_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, @@ -450,13 +514,9 @@ void grpc_subchannel_notify_on_state_change( w->next = &c->root_external_state_watcher; w->prev = w->next->prev; w->next->prev = w->prev->next = w; - if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, &w->closure)) { - c->connecting = 1; - /* released by connection */ - GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); - start_connect(exec_ctx, c); - } + grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, + state, &w->closure); + maybe_start_connecting_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } } @@ -576,7 +636,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, Re-evaluate if we really need this. */ gpr_atm_full_barrier(); GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); - c->connecting = 0; /* setup subchannel watching connected subchannel for changes; subchannel ref @@ -593,28 +652,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE, "connected"); } -static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_subchannel *c = arg; - gpr_mu_lock(&c->mu); - c->have_alarm = 0; - if (c->disconnected) { - error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); - } else { - GRPC_ERROR_REF(error); - } - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->next_attempt = - gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); - continue_connect(exec_ctx, c); - gpr_mu_unlock(&c->mu); - } else { - gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); - } - GRPC_ERROR_UNREF(error); -} - static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_subchannel *c = arg; @@ -622,35 +659,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); gpr_mu_lock(&c->mu); + c->connecting = false; if (c->connecting_result.transport != NULL) { publish_transport_locked(exec_ctx, c); } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - GPR_ASSERT(!c->have_alarm); - c->have_alarm = 1; grpc_connectivity_state_set( exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_int( GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); - gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); + const char *errmsg = grpc_error_string(error); gpr_log(GPR_INFO, "Connect failed: %s", errmsg); - if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= - 0) { - gpr_log(GPR_INFO, "Retry immediately"); - } else { - gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", - time_til_next.tv_sec, time_til_next.tv_nsec); - } - grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); grpc_error_free_string(errmsg); + + maybe_start_connecting_locked(exec_ctx, c); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected"); grpc_channel_args_destroy(exec_ctx, delete_channel_args); } diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index ab4af321f6..34fd388920 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -183,16 +183,14 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, NULL); if (wc_arg->rr_policy != NULL) { - /* if target is NULL, no pick has been made by the RR policy (eg, all + /* if *target is NULL, no pick has been made by the RR policy (eg, all * addresses failed to connect). There won't be any user_data/token * available */ - if (wc_arg->target != NULL) { + if (*wc_arg->target != NULL) { if (!GRPC_MDISNULL(wc_arg->lb_token)) { - GRPC_LOG_IF_ERROR( - "grpclb.initial_metadata_add_lb_token", - initial_metadata_add_lb_token(wc_arg->initial_metadata, - wc_arg->lb_token_mdelem_storage, - GRPC_MDELEM_REF(wc_arg->lb_token))); + initial_metadata_add_lb_token(wc_arg->initial_metadata, + wc_arg->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); } else { gpr_log(GPR_ERROR, "No LB token for connected subchannel pick %p (from RR " @@ -1244,12 +1242,15 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_INFO, "Incoming server list identical to current, ignoring."); } + grpc_grpclb_destroy_serverlist(serverlist); } else { /* new serverlist */ if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } - /* and update the copy in the glb_lb_policy instance */ + /* and update the copy in the glb_lb_policy instance. This serverlist + * instance will be destroyed either upon the next update or in + * glb_destroy() */ glb_policy->serverlist = serverlist; rr_handover_locked(exec_ctx, glb_policy); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 3e72c7cc2c..f84e5a610b 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -957,6 +957,19 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, + bool is_client, bool is_initial) { + for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail; + md = md->next) { + char *key = grpc_dump_slice(GRPC_MDKEY(md->md), GPR_DUMP_ASCII); + char *value = grpc_dump_slice(GRPC_MDVALUE(md->md), GPR_DUMP_ASCII); + gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL", + is_client ? "CLI" : "SVR", key, value); + gpr_free(key); + gpr_free(value); + } +} + static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); @@ -970,6 +983,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); gpr_free(str); + if (op->send_initial_metadata) { + log_metadata(op->send_initial_metadata, s->id, t->is_client, true); + } + if (op->send_trailing_metadata) { + log_metadata(op->send_trailing_metadata, s->id, t->is_client, false); + } } grpc_closure *on_complete = op->on_complete; diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 9fd4210020..3b47147685 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -258,12 +258,15 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, message, and the payload is below the size threshold, and all the data for this request is immediately available. */ grpc_mdelem method = GRPC_MDELEM_METHOD_POST; - calld->send_message_blocked = false; if ((op->send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && op->send_message != NULL && op->send_message->length < channeld->max_payload_size_for_get) { method = GRPC_MDELEM_METHOD_GET; + /* The following write to calld->send_message_blocked isn't racy with + reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because + being here means ops->send_message is not NULL, which is primarily + guarding the read there. */ calld->send_message_blocked = true; } else if (op->send_initial_metadata_flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { @@ -380,6 +383,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, calld->on_done_recv_trailing_metadata = NULL; calld->on_complete = NULL; calld->payload_bytes = NULL; + calld->send_message_blocked = false; grpc_slice_buffer_init(&calld->slices); grpc_closure_init(&calld->hc_on_recv_initial_metadata, hc_on_recv_initial_metadata, elem); diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 86674ca8c6..843ca42422 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -71,6 +71,11 @@ static int grpc_polling_trace = 0; /* Disabled by default */ static int grpc_wakeup_signal = -1; static bool is_grpc_wakeup_signal_initialized = false; +/* TODO: sreek: Right now, this wakes up all pollers. In future we should make + * sure to wake up one polling thread (which can wake up other threads if + * needed) */ +static grpc_wakeup_fd global_wakeup_fd; + /* Implements the function defined in grpc_posix.h. This function might be * called before even calling grpc_init() to set either a different signal to * use. If signum == -1, then the use of signals is disabled */ @@ -436,9 +441,8 @@ static void polling_island_add_wakeup_fd_locked(polling_island *pi, gpr_asprintf(&err_msg, "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with " "error: %d (%s)", - pi->epoll_fd, - GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno, - strerror(errno)); + pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd), + errno, strerror(errno)); append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); gpr_free(err_msg); } @@ -540,7 +544,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, goto done; } - polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); + polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error); polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error); if (initial_fd != NULL) { @@ -842,11 +846,6 @@ static void polling_island_global_shutdown() { * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a * case occurs. */ -/* TODO: sreek: Right now, this wakes up all pollers. In future we should make - * sure to wake up one polling thread (which can wake up other threads if - * needed) */ -grpc_wakeup_fd grpc_global_wakeup_fd; - static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; @@ -1162,11 +1161,11 @@ static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); poller_kick_init(); - return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&global_wakeup_fd); } static void pollset_global_shutdown(void) { - grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + grpc_wakeup_fd_destroy(&global_wakeup_fd); gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); } @@ -1273,7 +1272,7 @@ static grpc_error *pollset_kick(grpc_pollset *p, } static grpc_error *kick_poller(void) { - return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { @@ -1500,13 +1499,11 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &grpc_global_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), + if (data_ptr == &global_wakeup_fd) { + append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else if (data_ptr == &pi->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), + append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); maybe_do_workqueue_work(exec_ctx, pi); } else if (data_ptr == &polling_island_wakeup_fd) { diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index e1d620cfff..21b28e5554 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -120,6 +120,8 @@ struct grpc_fd { grpc_pollset *read_notifier_pollset; }; +static grpc_wakeup_fd global_wakeup_fd; + /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read or writability interest changes, the pollset can be kicked to pick up that @@ -769,17 +771,17 @@ static grpc_error *pollset_kick(grpc_pollset *p, static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); - return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&global_wakeup_fd); } static void pollset_global_shutdown(void) { - grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + grpc_wakeup_fd_destroy(&global_wakeup_fd); gpr_tls_destroy(&g_current_thread_poller); gpr_tls_destroy(&g_current_thread_worker); } static grpc_error *kick_poller(void) { - return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } /* main interface */ @@ -947,7 +949,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, fd_count = 0; pfd_count = 2; - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd); pfds[0].events = POLLIN; pfds[0].revents = 0; pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd); @@ -1001,8 +1003,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { - work_combine_error( - &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd)); } if (pfds[1].revents & POLLIN_CHECK) { work_combine_error( @@ -1343,6 +1345,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { int res, idx; gpr_cv *pollcv; cv_node *cvn, *prev; + int skip_poll = 0; nfds_t nsockfds = 0; gpr_thd_id t_id; gpr_thd_options opt; @@ -1358,17 +1361,17 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { cvn->cv = pollcv; cvn->next = g_cvfds.cvfds[idx].cvs; g_cvfds.cvfds[idx].cvs = cvn; - // We should return immediately if there are pending events, - // but we still need to call poll() to check for socket events + // Don't bother polling if a wakeup fd is ready if (g_cvfds.cvfds[idx].is_set) { - timeout = 0; + skip_poll = 1; } } else if (fds[i].fd >= 0) { nsockfds++; } } - if (nsockfds > 0) { + res = 0; + if (!skip_poll && nsockfds > 0) { pargs = gpr_malloc(sizeof(struct poll_args)); // Both the main thread and calling thread get a reference gpr_ref_init(&pargs->refcount, 2); @@ -1398,16 +1401,14 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { res = pargs->retval; errno = pargs->err; } else { - res = 0; errno = 0; gpr_atm_no_barrier_store(&pargs->status, CANCELLED); } - } else { + } else if (!skip_poll) { gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); deadline = gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); gpr_cv_wait(pollcv, &g_cvfds.mu, deadline); - res = 0; } idx = 0; @@ -1431,7 +1432,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { fds[i].revents = POLLIN; if (res >= 0) res++; } - } else if (fds[i].fd >= 0 && + } else if (!skip_poll && fds[i].fd >= 0 && gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { fds[i].revents = pargs->fds[idx].revents; idx++; diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 2fdef06838..cb5832539d 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -183,6 +183,5 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, /* override to allow tests to hook poll() usage */ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; -extern grpc_wakeup_fd grpc_global_wakeup_fd; #endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 8a233d0ba8..001e528409 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -106,6 +106,7 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); + grpc_iomgr_platform_flush(); gpr_mu_lock(&g_mu); continue; } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index aa92283379..1862223b77 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -104,6 +104,9 @@ struct grpc_resource_user { /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer */ grpc_closure *reclaimers[2]; + /* Reclaimers just posted: once we're in the combiner lock, we'll move them + to the array above */ + grpc_closure *new_reclaimers[2]; /* Trampoline closures to finish reclamation and re-enter the quota combiner lock */ grpc_closure post_reclaimer_closure[2]; @@ -411,9 +414,25 @@ static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru, rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); } +static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + bool destructive) { + grpc_closure *closure = resource_user->new_reclaimers[destructive]; + GPR_ASSERT(closure != NULL); + resource_user->new_reclaimers[destructive] = NULL; + GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); + if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + return false; + } + resource_user->reclaimers[destructive] = closure; + return true; +} + static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; + if (!ru_post_reclaimer(exec_ctx, resource_user, false)) return; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, @@ -428,6 +447,7 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; + if (!ru_post_reclaimer(exec_ctx, resource_user, true)) return; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, @@ -644,6 +664,8 @@ grpc_resource_user *grpc_resource_user_create( resource_user->added_to_free_pool = false; resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; + resource_user->new_reclaimers[0] = NULL; + resource_user->new_reclaimers[1] = NULL; for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_user->links[i].next = resource_user->links[i].prev = NULL; } @@ -743,12 +765,8 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, bool destructive, grpc_closure *closure) { - GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); - if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); - return; - } - resource_user->reclaimers[destructive] = closure; + GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL); + resource_user->new_reclaimers[destructive] = closure; grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, &resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE, false); diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index 35f23300dc..54911e0e31 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -76,6 +76,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) { LPFN_DISCONNECTEX DisconnectEx; DWORD ioctl_num_bytes; + gpr_mu_lock(&winsocket->state_mu); + if (winsocket->shutdown_called) { + gpr_mu_unlock(&winsocket->state_mu); + return; + } + winsocket->shutdown_called = true; + gpr_mu_unlock(&winsocket->state_mu); + status = WSAIoctl(winsocket->socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &DisconnectEx, sizeof(DisconnectEx), &ioctl_num_bytes, NULL, NULL); diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h index 490d0e0a06..a3875ce16c 100644 --- a/src/core/lib/iomgr/socket_windows.h +++ b/src/core/lib/iomgr/socket_windows.h @@ -87,6 +87,7 @@ typedef struct grpc_winsocket { grpc_winsocket_callback_info read_info; gpr_mu state_mu; + bool shutdown_called; /* You can't add the same socket twice to the same IO Completion Port. This prevents that. */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 7e49d89aa8..2db56b47ee 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -251,8 +251,11 @@ finish: done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, - "Failed to connect to remote host"); + char *error_descr; + gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", + grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION)); + error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, error_descr); + gpr_free(error_descr); error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str); } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 4ad417f77d..cc10060b9b 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -107,18 +107,22 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { gpr_mu_lock(&ac->mu); - if (error == GRPC_ERROR_NONE && socket != NULL) { - DWORD transfered_bytes = 0; - DWORD flags; - BOOL wsa_success = - WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped, - &transfered_bytes, FALSE, &flags); - GPR_ASSERT(transfered_bytes == 0); - if (!wsa_success) { - error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); + if (error == GRPC_ERROR_NONE) { + if (socket != NULL) { + DWORD transfered_bytes = 0; + DWORD flags; + BOOL wsa_success = + WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped, + &transfered_bytes, FALSE, &flags); + GPR_ASSERT(transfered_bytes == 0); + if (!wsa_success) { + error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); + } else { + *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); + socket = NULL; + } } else { - *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); - socket = NULL; + error = GRPC_ERROR_CREATE("socket is null"); } } diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index c2a6d1736e..f15fa6c0b2 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -73,6 +73,7 @@ struct grpc_tcp_listener { /* The cached AcceptEx for that port. */ LPFN_ACCEPTEX AcceptEx; int shutting_down; + int outstanding_calls; /* closure for socket notification of accept being ready */ grpc_closure on_accept; /* linked list */ @@ -140,10 +141,9 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - if (s->shutdown_complete != NULL) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); - } +static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_tcp_server *s = arg; /* Now that the accepts have been aborted, we can destroy the sockets. The IOCP won't get notified on these, so we can flag them as already @@ -159,6 +159,16 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(s); } +static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) { + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + } + + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s), + GRPC_ERROR_NONE, NULL); +} + grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { gpr_ref_non_zero(&s->refs); return s; @@ -180,17 +190,14 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* First, shutdown all fd's. This will queue abortion calls for all of the pending accepts due to the normal operation mechanism. */ if (s->active_ports == 0) { - immediately_done = 1; - } - for (sp = s->head; sp; sp = sp->next) { - sp->shutting_down = 1; - grpc_winsocket_shutdown(sp->socket); + finish_shutdown_locked(exec_ctx, s); + } else { + for (sp = s->head; sp; sp = sp->next) { + sp->shutting_down = 1; + grpc_winsocket_shutdown(sp->socket); + } } gpr_mu_unlock(&s->mu); - - if (immediately_done) { - finish_shutdown(exec_ctx, s); - } } void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { @@ -251,31 +258,30 @@ failure: return error; } -static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, - grpc_tcp_listener *sp) { +static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx, + grpc_tcp_listener *sp) { int notify = 0; sp->shutting_down = 0; - gpr_mu_lock(&sp->server->mu); GPR_ASSERT(sp->server->active_ports > 0); if (0 == --sp->server->active_ports) { - notify = 1; - } - gpr_mu_unlock(&sp->server->mu); - if (notify) { - finish_shutdown(exec_ctx, sp->server); + finish_shutdown_locked(exec_ctx, sp->server); } } /* In order to do an async accept, we need to create a socket first which will be the one assigned to the new incoming connection. */ -static grpc_error *start_accept(grpc_exec_ctx *exec_ctx, - grpc_tcp_listener *port) { +static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx, + grpc_tcp_listener *port) { SOCKET sock = INVALID_SOCKET; BOOL success; DWORD addrlen = sizeof(struct sockaddr_in6) + 16; DWORD bytes_received = 0; grpc_error *error = GRPC_ERROR_NONE; + if (port->shutting_down) { + return GRPC_ERROR_NONE; + } + sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (sock == INVALID_SOCKET) { @@ -305,20 +311,11 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx, immediately process an accept that happened in the meantime. */ port->new_socket = sock; grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept); + port->outstanding_calls++; return error; failure: GPR_ASSERT(error != GRPC_ERROR_NONE); - if (port->shutting_down) { - /* We are abandoning the listener port, take that into account to prevent - occasional hangs on shutdown. The hang happens when sp->shutting_down - change is not seen by on_accept and we proceed to trying new accept, - but we fail there because the listening port has been closed in the - meantime. */ - decrement_active_ports_and_notify(exec_ctx, port); - GRPC_ERROR_UNREF(error); - return GRPC_ERROR_NONE; - } if (sock != INVALID_SOCKET) closesocket(sock); return error; } @@ -338,6 +335,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { BOOL wsa_success; int err; + gpr_mu_lock(&sp->server->mu); + peer_name.len = sizeof(struct sockaddr_storage); /* The general mechanism for shutting down is to queue abortion calls. While @@ -347,6 +346,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { const char *msg = grpc_error_string(error); gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg); grpc_error_free_string(msg); + gpr_mu_unlock(&sp->server->mu); return; } @@ -356,17 +356,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, &transfered_bytes, FALSE, &flags); if (!wsa_success) { - if (sp->shutting_down) { - /* During the shutdown case, we ARE expecting an error. So that's well, - and we can wake up the shutdown thread. */ - decrement_active_ports_and_notify(exec_ctx, sp); - return; - } else { + if (!sp->shutting_down) { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); gpr_free(utf8_message); - closesocket(sock); } + closesocket(sock); } else { if (!sp->shutting_down) { peer_name_string = NULL; @@ -408,7 +403,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next connection. */ - GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp))); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp))); + if (0 == --sp->outstanding_calls) { + decrement_active_ports_and_notify_locked(exec_ctx, sp); + } + gpr_mu_unlock(&sp->server->mu); } static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, @@ -456,6 +456,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, sp->server = s; sp->socket = grpc_winsocket_create(sock, "listener"); sp->shutting_down = 0; + sp->outstanding_calls = 0; sp->AcceptEx = AcceptEx; sp->new_socket = INVALID_SOCKET; sp->port = port; @@ -553,7 +554,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, s->on_accept_cb = on_accept_cb; s->on_accept_cb_arg = on_accept_cb_arg; for (sp = s->head; sp; sp = sp->next) { - GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp))); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp))); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 34afe51df0..56fdaa5d82 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -423,7 +423,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, tcp->base.vtable = &vtable; tcp->socket = socket; gpr_mu_init(&tcp->mu); - gpr_ref_init(&tcp->refcount, 2); + gpr_ref_init(&tcp->refcount, 1); grpc_closure_init(&tcp->on_read, on_read, tcp); grpc_closure_init(&tcp->on_write, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c index 4f4de9298e..4247a1c12b 100644 --- a/src/core/lib/support/subprocess_posix.c +++ b/src/core/lib/support/subprocess_posix.c @@ -40,6 +40,7 @@ #include <assert.h> #include <errno.h> #include <signal.h> +#include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -52,7 +53,7 @@ struct gpr_subprocess { int pid; - int joined; + bool joined; }; const char *gpr_subprocess_binary_extension() { return ""; } @@ -97,9 +98,11 @@ retry: if (errno == EINTR) { goto retry; } - gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno)); + gpr_log(GPR_ERROR, "waitpid failed for pid %d: %s", p->pid, + strerror(errno)); return -1; } + p->joined = true; return status; } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 89072879d9..4f49d7cf7d 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -100,7 +100,12 @@ grpc_connectivity_state grpc_connectivity_state_check( return tracker->current_state; } -int grpc_connectivity_state_notify_on_state_change( +bool grpc_connectivity_state_has_watchers( + grpc_connectivity_state_tracker *connectivity_state) { + return connectivity_state->watchers != NULL; +} + +bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { @@ -119,7 +124,7 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); tracker->watchers = w->next; gpr_free(w); - return 0; + return false; } while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; @@ -127,11 +132,11 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); w->next = w->next->next; gpr_free(rm_candidate); - return 0; + return false; } w = w->next; } - return 0; + return false; } else { if (tracker->current_state != *current) { *current = tracker->current_state; diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 7a2fa52c10..769c675b79 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -75,13 +75,16 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_error *associated_error, const char *reason); +bool grpc_connectivity_state_has_watchers( + grpc_connectivity_state_tracker *tracker); + grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker, grpc_error **current_error); /** Return 1 if the channel should start connecting, 0 otherwise. If current==NULL cancel notify if it is already queued (success==0 in that case) */ -int grpc_connectivity_state_notify_on_state_change( +bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); |