aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-29 17:44:04 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-29 17:44:04 -0700
commitff3ae687e1e85d4fb29024c20a17595dce05e51f (patch)
tree72b3196ea99908afadd6eddb165c6d40efc3b1a7 /src/core
parent11bf14ec333e02cabef50d7ea61c52aa009d71b6 (diff)
Add connect retry, backoff
Diffstat (limited to 'src/core')
-rw-r--r--src/core/client_config/subchannel.c50
-rw-r--r--src/core/client_config/subchannel.h2
-rw-r--r--src/core/surface/server.c180
-rw-r--r--src/core/transport/chttp2/internal.h3
-rw-r--r--src/core/transport/chttp2_transport.c38
-rw-r--r--src/core/transport/connectivity_state.c17
-rw-r--r--src/core/transport/connectivity_state.h4
7 files changed, 182 insertions, 112 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 19ec1c0b4b..c8c562f29d 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -39,6 +39,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/iomgr/alarm.h"
#include "src/core/transport/connectivity_state.h"
typedef struct {
@@ -108,6 +109,15 @@ struct grpc_subchannel {
waiting_for_connect *waiting;
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
+
+ /** next connect attempt time */
+ gpr_timespec next_attempt;
+ /** amount to backoff each failure */
+ gpr_timespec backoff_delta;
+ /** do we have an active alarm? */
+ int have_alarm;
+ /** our alarm */
+ grpc_alarm alarm;
};
struct grpc_subchannel_call {
@@ -259,7 +269,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
-static void start_connect(grpc_subchannel *c) {
+static void continue_connect(grpc_subchannel *c) {
grpc_connect_in_args args;
args.interested_parties = &c->pollset_set;
@@ -273,6 +283,14 @@ static void start_connect(grpc_subchannel *c) {
&c->connected);
}
+static void start_connect(grpc_subchannel *c) {
+ gpr_timespec now = gpr_now();
+ c->next_attempt = now;
+ c->backoff_delta = gpr_time_from_seconds(1);
+
+ continue_connect(c);
+}
+
static void continue_creating_call(void *arg, int iomgr_success) {
waiting_for_connect *w4c = arg;
grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
@@ -350,10 +368,14 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_transport_op *op) {
connection *con = NULL;
grpc_subchannel *destroy;
+ int cancel_alarm = 0;
gpr_mu_lock(&c->mu);
if (op->disconnect) {
c->disconnected = 1;
connectivity_state_changed_locked(c);
+ if (c->have_alarm) {
+ cancel_alarm = 1;
+ }
}
if (c->active != NULL) {
con = c->active;
@@ -373,6 +395,10 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
subchannel_destroy(destroy);
}
}
+
+ if (cancel_alarm) {
+ grpc_alarm_cancel(&c->alarm);
+ }
}
static void on_state_changed(void *p, int iomgr_success) {
@@ -528,18 +554,30 @@ static void publish_transport(grpc_subchannel *c) {
}
}
+static void on_alarm(void *arg, int iomgr_success) {
+ grpc_subchannel *c = arg;
+ gpr_mu_lock(&c->mu);
+ c->have_alarm = 0;
+ gpr_mu_unlock(&c->mu);
+ if (iomgr_success) {
+ continue_connect(c);
+ } else {
+ GRPC_SUBCHANNEL_UNREF(c, "connecting");
+ }
+}
+
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
if (c->connecting_result.transport != NULL) {
publish_transport(c);
} else {
- int destroy;
gpr_mu_lock(&c->mu);
- destroy = SUBCHANNEL_UNREF_LOCKED(c, "connecting");
+ GPR_ASSERT(!c->have_alarm);
+ c->have_alarm = 1;
+ c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta);
+ c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta);
+ grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now());
gpr_mu_unlock(&c->mu);
- if (destroy) subchannel_destroy(c);
- /* TODO(ctiller): retry after sleeping */
- abort();
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 5435ef703b..03bd4f63e0 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -43,6 +43,8 @@ typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
+#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p), __FILE__, __LINE__, (r))
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index f7d385c7af..383c3d921d 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -202,18 +202,86 @@ struct call_data {
call_link links[CALL_LIST_COUNT];
};
+typedef struct {
+ grpc_channel **channels;
+ size_t num_channels;
+} channel_broadcaster;
+
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
-static void shutdown_channel(channel_data *chand, int send_goaway,
- int send_disconnect);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
static void maybe_finish_shutdown(grpc_server *server);
+/* channel broadcaster */
+
+/* assumes server locked */
+static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
+ channel_data *c;
+ size_t count = 0;
+ for (c = s->root_channel_data.next; c != &s->root_channel_data;
+ c = c->next) {
+ count ++;
+ }
+ cb->num_channels = count;
+ cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
+ count = 0;
+ for (c = s->root_channel_data.next; c != &s->root_channel_data;
+ c = c->next) {
+ cb->channels[count] = c->channel;
+ GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
+ count ++;
+ }
+}
+
+struct shutdown_cleanup_args {
+ grpc_iomgr_closure closure;
+ gpr_slice slice;
+};
+
+static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
+ struct shutdown_cleanup_args *a = arg;
+ gpr_slice_unref(a->slice);
+ gpr_free(a);
+}
+
+static void send_shutdown(grpc_channel *channel, int send_goaway, int send_disconnect) {
+ grpc_transport_op op;
+ struct shutdown_cleanup_args *sc;
+ grpc_channel_element *elem;
+
+ memset(&op, 0, sizeof(op));
+ gpr_log(GPR_DEBUG, "send_goaway:%d", send_goaway);
+ op.send_goaway = send_goaway;
+ sc = gpr_malloc(sizeof(*sc));
+ sc->slice = gpr_slice_from_copied_string("Server shutdown");
+ op.goaway_message = &sc->slice;
+ op.goaway_status = GRPC_STATUS_OK;
+ op.disconnect = send_disconnect;
+ grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
+ op.on_consumed = &sc->closure;
+
+ elem = grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(channel), 0);
+ elem->filter->start_transport_op(elem, &op);
+}
+
+static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goaway, int send_disconnect) {
+ size_t i;
+
+ for (i = 0; i < cb->num_channels; i++) {
+ send_shutdown(cb->channels[i], send_goaway, send_disconnect);
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
+ }
+ gpr_free(cb->channels);
+}
+
+/* call list */
+
static int call_list_join(call_data **root, call_data *call, call_list list) {
GPR_ASSERT(!call->root[list]);
call->root[list] = root;
@@ -458,12 +526,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void decrement_call_count(channel_data *chand) {
+static int decrement_call_count(channel_data *chand) {
+ int disconnect = 0;
chand->num_calls--;
if (0 == chand->num_calls && chand->server->shutdown) {
- shutdown_channel(chand, 0, 1);
+ disconnect = 1;
}
maybe_finish_shutdown(chand->server);
+ return disconnect;
}
static void server_on_recv(void *ptr, int success) {
@@ -471,6 +541,7 @@ static void server_on_recv(void *ptr, int success) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int remove_res;
+ int disconnect = 0;
if (success && !calld->got_initial_metadata) {
size_t i;
@@ -519,9 +590,16 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_unlock(&chand->server->mu_call);
gpr_mu_lock(&chand->server->mu_global);
if (remove_res) {
- decrement_call_count(chand);
+ disconnect = decrement_call_count(chand);
+ if (disconnect) {
+ GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect");
+ }
}
gpr_mu_unlock(&chand->server->mu_global);
+ if (disconnect) {
+ send_shutdown(chand->channel, 0, 1);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect");
+ }
break;
}
@@ -575,89 +653,6 @@ static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
}
}
-#if 0
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- channel_data *chand = elem->channel_data;
- grpc_server *server = chand->server;
-
- switch (op->type) {
- case GRPC_ACCEPT_CALL:
- /* create a call */
- grpc_call_create(chand->channel, NULL,
- op->data.accept_call.transport_server_data, NULL, 0,
- gpr_inf_future);
- break;
- case GRPC_TRANSPORT_CLOSED:
- /* if the transport is closed for a server channel, we destroy the
- channel */
- gpr_mu_lock(&server->mu_global);
- server_ref(server);
- destroy_channel(chand);
- gpr_mu_unlock(&server->mu_global);
- server_unref(server);
- break;
- case GRPC_TRANSPORT_GOAWAY:
- gpr_slice_unref(op->data.goaway.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-#endif
-
-typedef struct {
- channel_data *chand;
- int send_goaway;
- int send_disconnect;
- grpc_iomgr_closure finish_shutdown_channel_closure;
-
- /* for use during shutdown: the goaway message to send */
- gpr_slice goaway_message;
-} shutdown_channel_args;
-
-static void destroy_shutdown_channel_args(void *p, int success) {
- shutdown_channel_args *sca = p;
- GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown");
- gpr_slice_unref(sca->goaway_message);
- gpr_free(sca);
-}
-
-static void finish_shutdown_channel(void *p, int success) {
- shutdown_channel_args *sca = p;
- grpc_transport_op op;
- memset(&op, 0, sizeof(op));
-
- op.send_goaway = sca->send_goaway;
- sca->goaway_message = gpr_slice_from_copied_string("Server shutdown");
- op.goaway_message = &sca->goaway_message;
- op.goaway_status = GRPC_STATUS_OK;
- op.disconnect = sca->send_disconnect;
- grpc_iomgr_closure_init(&sca->finish_shutdown_channel_closure,
- destroy_shutdown_channel_args, sca);
- op.on_consumed = &sca->finish_shutdown_channel_closure;
-
- grpc_channel_next_op(
- grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- &op);
-}
-
-static void shutdown_channel(channel_data *chand, int send_goaway,
- int send_disconnect) {
- shutdown_channel_args *sca;
- GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown");
- sca = gpr_malloc(sizeof(shutdown_channel_args));
- sca->chand = chand;
- sca->send_goaway = send_goaway;
- sca->send_disconnect = send_disconnect;
- sca->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
- sca->finish_shutdown_channel_closure.cb_arg = sca;
- grpc_iomgr_add_callback(&sca->finish_shutdown_channel_closure);
-}
-
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_stream_op *initial_op) {
@@ -969,10 +964,10 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
- channel_data *c;
size_t i;
registered_method *rm;
shutdown_tag *sdt;
+ channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
@@ -988,10 +983,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
return;
}
- for (c = server->root_channel_data.next; c != &server->root_channel_data;
- c = c->next) {
- shutdown_channel(c, 1, c->num_calls == 0);
- }
+ channel_broadcaster_init(server, &broadcaster);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
@@ -1029,6 +1021,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
}
+
+ channel_broadcaster_shutdown(&broadcaster, 1, 0);
}
void grpc_server_listener_destroy_done(void *s) {
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 7e2e75f97d..c8c46f0e54 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -160,7 +160,8 @@ typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** queued callbacks */
- grpc_iomgr_closure *pending_closures;
+ grpc_iomgr_closure *pending_closures_head;
+ grpc_iomgr_closure *pending_closures_tail;
/** window available for us to send to peer */
gpr_uint32 outgoing_window;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 8f909dff37..08a767f1d5 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -117,6 +117,8 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t,
static void maybe_start_some_streams(
grpc_chttp2_transport_global *transport_global);
+static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state);
+
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -328,7 +330,7 @@ static void destroy_transport(grpc_transport *gt) {
static void close_transport_locked(grpc_chttp2_transport *t) {
if (!t->closed) {
t->closed = 1;
- grpc_connectivity_state_set(&t->channel_callback.state_tracker,
+ connectivity_state_set(&t->global,
GRPC_CHANNEL_FATAL_FAILURE);
if (t->ep) {
grpc_endpoint_shutdown(t->ep);
@@ -451,8 +453,9 @@ static void unlock(grpc_chttp2_transport *t) {
grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
}
- run_closures = t->global.pending_closures;
- t->global.pending_closures = NULL;
+ run_closures = t->global.pending_closures_head;
+ t->global.pending_closures_head = NULL;
+ t->global.pending_closures_tail = NULL;
gpr_mu_unlock(&t->mu);
@@ -523,8 +526,8 @@ void grpc_chttp2_add_incoming_goaway(
gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
- grpc_connectivity_state_set(
- &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
+ connectivity_state_set(
+ transport_global,
GRPC_CHANNEL_FATAL_FAILURE);
}
@@ -550,8 +553,7 @@ static void maybe_start_some_streams(
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
- grpc_connectivity_state_set(&TRANSPORT_FROM_GLOBAL(transport_global)
- ->channel_callback.state_tracker,
+ connectivity_state_set(transport_global,
GRPC_CHANNEL_TRANSIENT_FAILURE);
}
@@ -933,12 +935,30 @@ static void reading_action(void *pt, int iomgr_success_ignored) {
* CALLBACK LOOP
*/
+static void schedule_closure_for_connectivity(void *a, grpc_iomgr_closure *closure) {
+ grpc_chttp2_schedule_closure(a, closure, 1);
+}
+
+static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state) {
+ grpc_connectivity_state_set_with_scheduler(
+ &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
+ state,
+ schedule_closure_for_connectivity,
+ transport_global);
+}
+
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success) {
closure->success = success;
- closure->next = transport_global->pending_closures;
- transport_global->pending_closures = closure;
+ if (transport_global->pending_closures_tail == NULL) {
+ transport_global->pending_closures_head =
+ transport_global->pending_closures_tail = closure;
+ } else {
+ transport_global->pending_closures_tail->next = closure;
+ transport_global->pending_closures_tail = closure;
+ }
+ closure->next = NULL;
}
/*
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 8df08af32f..9a956a5a58 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -79,8 +79,10 @@ int grpc_connectivity_state_notify_on_state_change(
return tracker->current_state == GRPC_CHANNEL_IDLE;
}
-void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
- grpc_connectivity_state state) {
+void grpc_connectivity_state_set_with_scheduler(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
+ void (*scheduler)(void *arg, grpc_iomgr_closure *closure),
+ void *arg) {
grpc_connectivity_state_watcher *new = NULL;
grpc_connectivity_state_watcher *w;
/*gpr_log(GPR_DEBUG, "CS:%p:set:%d", tracker, state);*/
@@ -93,7 +95,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
if (state != *w->current) {
*w->current = state;
- grpc_iomgr_add_callback(w->notify);
+ scheduler(arg, w->notify);
gpr_free(w);
} else {
w->next = new;
@@ -102,3 +104,12 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
}
tracker->watchers = new;
}
+
+static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) {
+ grpc_iomgr_add_callback(closure);
+}
+
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state) {
+ grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, NULL);
+}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index 9a8c57525f..c6f903a1ea 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -59,6 +59,10 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state);
+void grpc_connectivity_state_set_with_scheduler(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
+ void (*scheduler)(void *arg, grpc_iomgr_closure *closure),
+ void *arg);
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);