aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-11-30 13:41:53 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-11-30 13:41:53 -0800
commit36b3135929cf1561d35039fc9e04e038f5351ed7 (patch)
treeb20b4f6ab4043fae9a170adb711b488be0bd1a59 /src/core
parent0f97958b64a792e551aa3bde84bb8f53b04de3b4 (diff)
parent369ddc524a6ca55afb8e8bd6743ed5624e1a94ce (diff)
Merge branch 'slice_interning' into metadata_filter
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_channel/subchannel.c138
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c17
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c19
-rw-r--r--src/core/lib/channel/http_client_filter.c6
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c31
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c29
-rw-r--r--src/core/lib/iomgr/ev_posix.h1
-rw-r--r--src/core/lib/iomgr/iomgr.c1
-rw-r--r--src/core/lib/iomgr/resource_quota.c30
-rw-r--r--src/core/lib/iomgr/socket_windows.c8
-rw-r--r--src/core/lib/iomgr/socket_windows.h1
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c7
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c26
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c86
-rw-r--r--src/core/lib/iomgr/tcp_windows.c2
-rw-r--r--src/core/lib/support/subprocess_posix.c7
-rw-r--r--src/core/lib/transport/connectivity_state.c13
-rw-r--r--src/core/lib/transport/connectivity_state.h5
18 files changed, 263 insertions, 164 deletions
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 063b79cece..d38f7985a5 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -120,9 +120,9 @@ struct grpc_subchannel {
gpr_mu mu;
/** have we seen a disconnection? */
- int disconnected;
+ bool disconnected;
/** are we connecting */
- int connecting;
+ bool connecting;
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
@@ -133,7 +133,9 @@ struct grpc_subchannel {
/** backoff state */
gpr_backoff backoff_state;
/** do we have an active alarm? */
- int have_alarm;
+ bool have_alarm;
+ /** have we started the backoff loop */
+ bool backoff_begun;
/** our alarm */
grpc_timer alarm;
};
@@ -265,7 +267,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_subchannel_index_unregister(exec_ctx, c->key, c);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
- c->disconnected = 1;
+ c->disconnected = true;
grpc_connector_shutdown(exec_ctx, c->connector);
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
@@ -371,7 +373,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
return grpc_subchannel_index_register(exec_ctx, key, c);
}
-static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -387,12 +390,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
&c->connected);
}
-static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- c->next_attempt =
- gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
-}
-
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
grpc_error **error) {
grpc_connectivity_state state;
@@ -419,6 +416,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_subchannel *c = arg;
+ gpr_mu_lock(&c->mu);
+ c->have_alarm = false;
+ if (c->disconnected) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+ c->next_attempt =
+ gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+ continue_connect_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ if (c->disconnected) {
+ /* Don't try to connect if we're already disconnected */
+ return;
+ }
+
+ if (c->connecting) {
+ /* Already connecting: don't restart */
+ return;
+ }
+
+ if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) {
+ /* Already connected: don't restart */
+ return;
+ }
+
+ if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+ /* Nobody is interested in connecting: so don't just yet */
+ return;
+ }
+
+ c->connecting = true;
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!c->backoff_begun) {
+ c->backoff_begun = true;
+ c->next_attempt = gpr_backoff_begin(&c->backoff_state, now);
+ continue_connect_locked(exec_ctx, c);
+ } else {
+ GPR_ASSERT(!c->have_alarm);
+ c->have_alarm = true;
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
+ grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ }
+}
+
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
@@ -450,13 +514,9 @@ void grpc_subchannel_notify_on_state_change(
w->next = &c->root_external_state_watcher;
w->prev = w->next->prev;
w->next->prev = w->prev->next = w;
- if (grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &c->state_tracker, state, &w->closure)) {
- c->connecting = 1;
- /* released by connection */
- GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
- start_connect(exec_ctx, c);
- }
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker,
+ state, &w->closure);
+ maybe_start_connecting_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
}
}
@@ -576,7 +636,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
Re-evaluate if we really need this. */
gpr_atm_full_barrier();
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
- c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel
ref
@@ -593,28 +652,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, "connected");
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_subchannel *c = arg;
- gpr_mu_lock(&c->mu);
- c->have_alarm = 0;
- if (c->disconnected) {
- error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
- } else {
- GRPC_ERROR_REF(error);
- }
- if (error == GRPC_ERROR_NONE) {
- gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
- c->next_attempt =
- gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
- gpr_mu_unlock(&c->mu);
- } else {
- gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
- }
- GRPC_ERROR_UNREF(error);
-}
-
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_subchannel *c = arg;
@@ -622,35 +659,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu);
+ c->connecting = false;
if (c->connecting_result.transport != NULL) {
publish_transport_locked(exec_ctx, c);
} else if (c->disconnected) {
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- GPR_ASSERT(!c->have_alarm);
- c->have_alarm = 1;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
- gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+
const char *errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
- if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
- 0) {
- gpr_log(GPR_INFO, "Retry immediately");
- } else {
- gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
- time_til_next.tv_sec, time_til_next.tv_nsec);
- }
- grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
grpc_error_free_string(errmsg);
+
+ maybe_start_connecting_locked(exec_ctx, c);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected");
grpc_channel_args_destroy(exec_ctx, delete_channel_args);
}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index ab4af321f6..34fd388920 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -183,16 +183,14 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
NULL);
if (wc_arg->rr_policy != NULL) {
- /* if target is NULL, no pick has been made by the RR policy (eg, all
+ /* if *target is NULL, no pick has been made by the RR policy (eg, all
* addresses failed to connect). There won't be any user_data/token
* available */
- if (wc_arg->target != NULL) {
+ if (*wc_arg->target != NULL) {
if (!GRPC_MDISNULL(wc_arg->lb_token)) {
- GRPC_LOG_IF_ERROR(
- "grpclb.initial_metadata_add_lb_token",
- initial_metadata_add_lb_token(wc_arg->initial_metadata,
- wc_arg->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(wc_arg->lb_token)));
+ initial_metadata_add_lb_token(wc_arg->initial_metadata,
+ wc_arg->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(wc_arg->lb_token));
} else {
gpr_log(GPR_ERROR,
"No LB token for connected subchannel pick %p (from RR "
@@ -1244,12 +1242,15 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_INFO,
"Incoming server list identical to current, ignoring.");
}
+ grpc_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */
if (glb_policy->serverlist != NULL) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
- /* and update the copy in the glb_lb_policy instance */
+ /* and update the copy in the glb_lb_policy instance. This serverlist
+ * instance will be destroyed either upon the next update or in
+ * glb_destroy() */
glb_policy->serverlist = serverlist;
rr_handover_locked(exec_ctx, glb_policy);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 3e72c7cc2c..f84e5a610b 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -957,6 +957,19 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
+ bool is_client, bool is_initial) {
+ for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail;
+ md = md->next) {
+ char *key = grpc_dump_slice(GRPC_MDKEY(md->md), GPR_DUMP_ASCII);
+ char *value = grpc_dump_slice(GRPC_MDVALUE(md->md), GPR_DUMP_ASCII);
+ gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
+ is_client ? "CLI" : "SVR", key, value);
+ gpr_free(key);
+ gpr_free(value);
+ }
+}
+
static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
@@ -970,6 +983,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
+ if (op->send_initial_metadata) {
+ log_metadata(op->send_initial_metadata, s->id, t->is_client, true);
+ }
+ if (op->send_trailing_metadata) {
+ log_metadata(op->send_trailing_metadata, s->id, t->is_client, false);
+ }
}
grpc_closure *on_complete = op->on_complete;
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 9fd4210020..3b47147685 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -258,12 +258,15 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
message, and the payload is below the size threshold, and all the data
for this request is immediately available. */
grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
- calld->send_message_blocked = false;
if ((op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
op->send_message != NULL &&
op->send_message->length < channeld->max_payload_size_for_get) {
method = GRPC_MDELEM_METHOD_GET;
+ /* The following write to calld->send_message_blocked isn't racy with
+ reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
+ being here means ops->send_message is not NULL, which is primarily
+ guarding the read there. */
calld->send_message_blocked = true;
} else if (op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
@@ -380,6 +383,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
calld->on_done_recv_trailing_metadata = NULL;
calld->on_complete = NULL;
calld->payload_bytes = NULL;
+ calld->send_message_blocked = false;
grpc_slice_buffer_init(&calld->slices);
grpc_closure_init(&calld->hc_on_recv_initial_metadata,
hc_on_recv_initial_metadata, elem);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 86674ca8c6..843ca42422 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -71,6 +71,11 @@ static int grpc_polling_trace = 0; /* Disabled by default */
static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false;
+/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
+ * sure to wake up one polling thread (which can wake up other threads if
+ * needed) */
+static grpc_wakeup_fd global_wakeup_fd;
+
/* Implements the function defined in grpc_posix.h. This function might be
* called before even calling grpc_init() to set either a different signal to
* use. If signum == -1, then the use of signals is disabled */
@@ -436,9 +441,8 @@ static void polling_island_add_wakeup_fd_locked(polling_island *pi,
gpr_asprintf(&err_msg,
"epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
"error: %d (%s)",
- pi->epoll_fd,
- GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
- strerror(errno));
+ pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
+ errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
@@ -540,7 +544,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done;
}
- polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+ polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
if (initial_fd != NULL) {
@@ -842,11 +846,6 @@ static void polling_island_global_shutdown() {
* alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
* case occurs. */
-/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
- * sure to wake up one polling thread (which can wake up other threads if
- * needed) */
-grpc_wakeup_fd grpc_global_wakeup_fd;
-
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
@@ -1162,11 +1161,11 @@ static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
poller_kick_init();
- return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_init(&global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
- grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ grpc_wakeup_fd_destroy(&global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
}
@@ -1273,7 +1272,7 @@ static grpc_error *pollset_kick(grpc_pollset *p,
}
static grpc_error *kick_poller(void) {
- return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
@@ -1500,13 +1499,11 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &grpc_global_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+ if (data_ptr == &global_wakeup_fd) {
+ append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else if (data_ptr == &pi->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+ append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
maybe_do_workqueue_work(exec_ctx, pi);
} else if (data_ptr == &polling_island_wakeup_fd) {
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index e1d620cfff..21b28e5554 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -120,6 +120,8 @@ struct grpc_fd {
grpc_pollset *read_notifier_pollset;
};
+static grpc_wakeup_fd global_wakeup_fd;
+
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
@@ -769,17 +771,17 @@ static grpc_error *pollset_kick(grpc_pollset *p,
static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
- return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_init(&global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
- grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ grpc_wakeup_fd_destroy(&global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
}
static grpc_error *kick_poller(void) {
- return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
/* main interface */
@@ -947,7 +949,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
fd_count = 0;
pfd_count = 2;
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd);
pfds[0].events = POLLIN;
pfds[0].revents = 0;
pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
@@ -1001,8 +1003,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
- work_combine_error(
- &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
+ work_combine_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd));
}
if (pfds[1].revents & POLLIN_CHECK) {
work_combine_error(
@@ -1343,6 +1345,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
int res, idx;
gpr_cv *pollcv;
cv_node *cvn, *prev;
+ int skip_poll = 0;
nfds_t nsockfds = 0;
gpr_thd_id t_id;
gpr_thd_options opt;
@@ -1358,17 +1361,17 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
cvn->cv = pollcv;
cvn->next = g_cvfds.cvfds[idx].cvs;
g_cvfds.cvfds[idx].cvs = cvn;
- // We should return immediately if there are pending events,
- // but we still need to call poll() to check for socket events
+ // Don't bother polling if a wakeup fd is ready
if (g_cvfds.cvfds[idx].is_set) {
- timeout = 0;
+ skip_poll = 1;
}
} else if (fds[i].fd >= 0) {
nsockfds++;
}
}
- if (nsockfds > 0) {
+ res = 0;
+ if (!skip_poll && nsockfds > 0) {
pargs = gpr_malloc(sizeof(struct poll_args));
// Both the main thread and calling thread get a reference
gpr_ref_init(&pargs->refcount, 2);
@@ -1398,16 +1401,14 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
res = pargs->retval;
errno = pargs->err;
} else {
- res = 0;
errno = 0;
gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
}
- } else {
+ } else if (!skip_poll) {
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
deadline =
gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
- res = 0;
}
idx = 0;
@@ -1431,7 +1432,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
- } else if (fds[i].fd >= 0 &&
+ } else if (!skip_poll && fds[i].fd >= 0 &&
gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
fds[i].revents = pargs->fds[idx].revents;
idx++;
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 2fdef06838..cb5832539d 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -183,6 +183,5 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
-extern grpc_wakeup_fd grpc_global_wakeup_fd;
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 8a233d0ba8..001e528409 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -106,6 +106,7 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_flush(exec_ctx);
+ grpc_iomgr_platform_flush();
gpr_mu_lock(&g_mu);
continue;
}
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index aa92283379..1862223b77 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -104,6 +104,9 @@ struct grpc_resource_user {
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure *reclaimers[2];
+ /* Reclaimers just posted: once we're in the combiner lock, we'll move them
+ to the array above */
+ grpc_closure *new_reclaimers[2];
/* Trampoline closures to finish reclamation and re-enter the quota combiner
lock */
grpc_closure post_reclaimer_closure[2];
@@ -411,9 +414,25 @@ static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru,
rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
}
+static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user,
+ bool destructive) {
+ grpc_closure *closure = resource_user->new_reclaimers[destructive];
+ GPR_ASSERT(closure != NULL);
+ resource_user->new_reclaimers[destructive] = NULL;
+ GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
+ if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
+ return false;
+ }
+ resource_user->reclaimers[destructive] = closure;
+ return true;
+}
+
static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
grpc_resource_user *resource_user = ru;
+ if (!ru_post_reclaimer(exec_ctx, resource_user, false)) return;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@@ -428,6 +447,7 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
grpc_resource_user *resource_user = ru;
+ if (!ru_post_reclaimer(exec_ctx, resource_user, true)) return;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@@ -644,6 +664,8 @@ grpc_resource_user *grpc_resource_user_create(
resource_user->added_to_free_pool = false;
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
+ resource_user->new_reclaimers[0] = NULL;
+ resource_user->new_reclaimers[1] = NULL;
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_user->links[i].next = resource_user->links[i].prev = NULL;
}
@@ -743,12 +765,8 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
bool destructive,
grpc_closure *closure) {
- GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
- if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
- return;
- }
- resource_user->reclaimers[destructive] = closure;
+ GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL);
+ resource_user->new_reclaimers[destructive] = closure;
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
&resource_user->post_reclaimer_closure[destructive],
GRPC_ERROR_NONE, false);
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index 35f23300dc..54911e0e31 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -76,6 +76,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
LPFN_DISCONNECTEX DisconnectEx;
DWORD ioctl_num_bytes;
+ gpr_mu_lock(&winsocket->state_mu);
+ if (winsocket->shutdown_called) {
+ gpr_mu_unlock(&winsocket->state_mu);
+ return;
+ }
+ winsocket->shutdown_called = true;
+ gpr_mu_unlock(&winsocket->state_mu);
+
status = WSAIoctl(winsocket->socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
&ioctl_num_bytes, NULL, NULL);
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index 490d0e0a06..a3875ce16c 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -87,6 +87,7 @@ typedef struct grpc_winsocket {
grpc_winsocket_callback_info read_info;
gpr_mu state_mu;
+ bool shutdown_called;
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 7e49d89aa8..2db56b47ee 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -251,8 +251,11 @@ finish:
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (error != GRPC_ERROR_NONE) {
- error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
- "Failed to connect to remote host");
+ char *error_descr;
+ gpr_asprintf(&error_descr, "Failed to connect to remote host: %s",
+ grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION));
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, error_descr);
+ gpr_free(error_descr);
error =
grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str);
}
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 4ad417f77d..cc10060b9b 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -107,18 +107,22 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
gpr_mu_lock(&ac->mu);
- if (error == GRPC_ERROR_NONE && socket != NULL) {
- DWORD transfered_bytes = 0;
- DWORD flags;
- BOOL wsa_success =
- WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
- &transfered_bytes, FALSE, &flags);
- GPR_ASSERT(transfered_bytes == 0);
- if (!wsa_success) {
- error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
+ if (error == GRPC_ERROR_NONE) {
+ if (socket != NULL) {
+ DWORD transfered_bytes = 0;
+ DWORD flags;
+ BOOL wsa_success =
+ WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
+ &transfered_bytes, FALSE, &flags);
+ GPR_ASSERT(transfered_bytes == 0);
+ if (!wsa_success) {
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
+ } else {
+ *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
+ socket = NULL;
+ }
} else {
- *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
- socket = NULL;
+ error = GRPC_ERROR_CREATE("socket is null");
}
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index c2a6d1736e..f15fa6c0b2 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -73,6 +73,7 @@ struct grpc_tcp_listener {
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
+ int outstanding_calls;
/* closure for socket notification of accept being ready */
grpc_closure on_accept;
/* linked list */
@@ -140,10 +141,9 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
- }
+static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_tcp_server *s = arg;
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
@@ -159,6 +159,16 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s);
}
+static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ }
+
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s),
+ GRPC_ERROR_NONE, NULL);
+}
+
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
gpr_ref_non_zero(&s->refs);
return s;
@@ -180,17 +190,14 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) {
- immediately_done = 1;
- }
- for (sp = s->head; sp; sp = sp->next) {
- sp->shutting_down = 1;
- grpc_winsocket_shutdown(sp->socket);
+ finish_shutdown_locked(exec_ctx, s);
+ } else {
+ for (sp = s->head; sp; sp = sp->next) {
+ sp->shutting_down = 1;
+ grpc_winsocket_shutdown(sp->socket);
+ }
}
gpr_mu_unlock(&s->mu);
-
- if (immediately_done) {
- finish_shutdown(exec_ctx, s);
- }
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
@@ -251,31 +258,30 @@ failure:
return error;
}
-static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *sp) {
+static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *sp) {
int notify = 0;
sp->shutting_down = 0;
- gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) {
- notify = 1;
- }
- gpr_mu_unlock(&sp->server->mu);
- if (notify) {
- finish_shutdown(exec_ctx, sp->server);
+ finish_shutdown_locked(exec_ctx, sp->server);
}
}
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
-static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *port) {
+static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
DWORD addrlen = sizeof(struct sockaddr_in6) + 16;
DWORD bytes_received = 0;
grpc_error *error = GRPC_ERROR_NONE;
+ if (port->shutting_down) {
+ return GRPC_ERROR_NONE;
+ }
+
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
@@ -305,20 +311,11 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
+ port->outstanding_calls++;
return error;
failure:
GPR_ASSERT(error != GRPC_ERROR_NONE);
- if (port->shutting_down) {
- /* We are abandoning the listener port, take that into account to prevent
- occasional hangs on shutdown. The hang happens when sp->shutting_down
- change is not seen by on_accept and we proceed to trying new accept,
- but we fail there because the listening port has been closed in the
- meantime. */
- decrement_active_ports_and_notify(exec_ctx, port);
- GRPC_ERROR_UNREF(error);
- return GRPC_ERROR_NONE;
- }
if (sock != INVALID_SOCKET) closesocket(sock);
return error;
}
@@ -338,6 +335,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
BOOL wsa_success;
int err;
+ gpr_mu_lock(&sp->server->mu);
+
peer_name.len = sizeof(struct sockaddr_storage);
/* The general mechanism for shutting down is to queue abortion calls. While
@@ -347,6 +346,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
const char *msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
grpc_error_free_string(msg);
+ gpr_mu_unlock(&sp->server->mu);
return;
}
@@ -356,17 +356,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
if (!wsa_success) {
- if (sp->shutting_down) {
- /* During the shutdown case, we ARE expecting an error. So that's well,
- and we can wake up the shutdown thread. */
- decrement_active_ports_and_notify(exec_ctx, sp);
- return;
- } else {
+ if (!sp->shutting_down) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message);
- closesocket(sock);
}
+ closesocket(sock);
} else {
if (!sp->shutting_down) {
peer_name_string = NULL;
@@ -408,7 +403,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
+ if (0 == --sp->outstanding_calls) {
+ decrement_active_ports_and_notify_locked(exec_ctx, sp);
+ }
+ gpr_mu_unlock(&sp->server->mu);
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@@ -456,6 +456,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
+ sp->outstanding_calls = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
sp->port = port;
@@ -553,7 +554,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (sp = s->head; sp; sp = sp->next) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 34afe51df0..56fdaa5d82 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -423,7 +423,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
tcp->base.vtable = &vtable;
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
- gpr_ref_init(&tcp->refcount, 2);
+ gpr_ref_init(&tcp->refcount, 1);
grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);
diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c
index 4f4de9298e..4247a1c12b 100644
--- a/src/core/lib/support/subprocess_posix.c
+++ b/src/core/lib/support/subprocess_posix.c
@@ -40,6 +40,7 @@
#include <assert.h>
#include <errno.h>
#include <signal.h>
+#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -52,7 +53,7 @@
struct gpr_subprocess {
int pid;
- int joined;
+ bool joined;
};
const char *gpr_subprocess_binary_extension() { return ""; }
@@ -97,9 +98,11 @@ retry:
if (errno == EINTR) {
goto retry;
}
- gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno));
+ gpr_log(GPR_ERROR, "waitpid failed for pid %d: %s", p->pid,
+ strerror(errno));
return -1;
}
+ p->joined = true;
return status;
}
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 89072879d9..4f49d7cf7d 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -100,7 +100,12 @@ grpc_connectivity_state grpc_connectivity_state_check(
return tracker->current_state;
}
-int grpc_connectivity_state_notify_on_state_change(
+bool grpc_connectivity_state_has_watchers(
+ grpc_connectivity_state_tracker *connectivity_state) {
+ return connectivity_state->watchers != NULL;
+}
+
+bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify) {
if (grpc_connectivity_state_trace) {
@@ -119,7 +124,7 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
tracker->watchers = w->next;
gpr_free(w);
- return 0;
+ return false;
}
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
@@ -127,11 +132,11 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
w->next = w->next->next;
gpr_free(rm_candidate);
- return 0;
+ return false;
}
w = w->next;
}
- return 0;
+ return false;
} else {
if (tracker->current_state != *current) {
*current = tracker->current_state;
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 7a2fa52c10..769c675b79 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -75,13 +75,16 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_error *associated_error,
const char *reason);
+bool grpc_connectivity_state_has_watchers(
+ grpc_connectivity_state_tracker *tracker);
+
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker, grpc_error **current_error);
/** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
case) */
-int grpc_connectivity_state_notify_on_state_change(
+bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify);