aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/client_channel/subchannel.c138
-rw-r--r--src/core/lib/iomgr/socket_windows.c8
-rw-r--r--src/core/lib/iomgr/socket_windows.h1
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c26
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c86
-rw-r--r--src/core/lib/iomgr/udp_server.c6
-rw-r--r--src/core/lib/support/subprocess_posix.c3
-rw-r--r--src/core/lib/transport/connectivity_state.c13
-rw-r--r--src/core/lib/transport/connectivity_state.h5
-rw-r--r--src/objective-c/tests/Podfile4
-rw-r--r--test/cpp/grpclb/grpclb_test.cc59
-rw-r--r--test/cpp/qps/json_run_localhost.cc65
-rw-r--r--tools/README.md2
-rw-r--r--tools/internal_ci/README.md6
-rw-r--r--tools/internal_ci/linux/grpc_master.cfg34
-rwxr-xr-xtools/internal_ci/linux/run_tests.sh45
-rw-r--r--tools/jenkins/README.md5
17 files changed, 346 insertions, 160 deletions
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index a148b2a0e1..b83c6882dc 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -119,9 +119,9 @@ struct grpc_subchannel {
gpr_mu mu;
/** have we seen a disconnection? */
- int disconnected;
+ bool disconnected;
/** are we connecting */
- int connecting;
+ bool connecting;
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
@@ -132,7 +132,9 @@ struct grpc_subchannel {
/** backoff state */
gpr_backoff backoff_state;
/** do we have an active alarm? */
- int have_alarm;
+ bool have_alarm;
+ /** have we started the backoff loop */
+ bool backoff_begun;
/** our alarm */
grpc_timer alarm;
};
@@ -264,7 +266,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_subchannel_index_unregister(exec_ctx, c->key, c);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
- c->disconnected = 1;
+ c->disconnected = true;
grpc_connector_shutdown(exec_ctx, c->connector);
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
@@ -370,7 +372,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
return grpc_subchannel_index_register(exec_ctx, key, c);
}
-static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -386,12 +389,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
&c->connected);
}
-static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- c->next_attempt =
- gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
-}
-
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
grpc_error **error) {
grpc_connectivity_state state;
@@ -418,6 +415,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_subchannel *c = arg;
+ gpr_mu_lock(&c->mu);
+ c->have_alarm = false;
+ if (c->disconnected) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+ c->next_attempt =
+ gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+ continue_connect_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ if (c->disconnected) {
+ /* Don't try to connect if we're already disconnected */
+ return;
+ }
+
+ if (c->connecting) {
+ /* Already connecting: don't restart */
+ return;
+ }
+
+ if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) {
+ /* Already connected: don't restart */
+ return;
+ }
+
+ if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+ /* Nobody is interested in connecting: so don't just yet */
+ return;
+ }
+
+ c->connecting = true;
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!c->backoff_begun) {
+ c->backoff_begun = true;
+ c->next_attempt = gpr_backoff_begin(&c->backoff_state, now);
+ continue_connect_locked(exec_ctx, c);
+ } else {
+ GPR_ASSERT(!c->have_alarm);
+ c->have_alarm = true;
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
+ grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ }
+}
+
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
@@ -449,13 +513,9 @@ void grpc_subchannel_notify_on_state_change(
w->next = &c->root_external_state_watcher;
w->prev = w->next->prev;
w->next->prev = w->prev->next = w;
- if (grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &c->state_tracker, state, &w->closure)) {
- c->connecting = 1;
- /* released by connection */
- GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
- start_connect(exec_ctx, c);
- }
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker,
+ state, &w->closure);
+ maybe_start_connecting_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
}
}
@@ -575,7 +635,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
Re-evaluate if we really need this. */
gpr_atm_full_barrier();
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
- c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel
ref
@@ -592,28 +651,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, "connected");
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_subchannel *c = arg;
- gpr_mu_lock(&c->mu);
- c->have_alarm = 0;
- if (c->disconnected) {
- error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
- } else {
- GRPC_ERROR_REF(error);
- }
- if (error == GRPC_ERROR_NONE) {
- gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
- c->next_attempt =
- gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
- gpr_mu_unlock(&c->mu);
- } else {
- gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
- }
- GRPC_ERROR_UNREF(error);
-}
-
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_subchannel *c = arg;
@@ -621,35 +658,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu);
+ c->connecting = false;
if (c->connecting_result.transport != NULL) {
publish_transport_locked(exec_ctx, c);
} else if (c->disconnected) {
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- GPR_ASSERT(!c->have_alarm);
- c->have_alarm = 1;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
- gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+
const char *errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
- if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
- 0) {
- gpr_log(GPR_INFO, "Retry immediately");
- } else {
- gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
- time_til_next.tv_sec, time_til_next.tv_nsec);
- }
- grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
grpc_error_free_string(errmsg);
+
+ maybe_start_connecting_locked(exec_ctx, c);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected");
grpc_channel_args_destroy(delete_channel_args);
}
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index 35f23300dc..54911e0e31 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -76,6 +76,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
LPFN_DISCONNECTEX DisconnectEx;
DWORD ioctl_num_bytes;
+ gpr_mu_lock(&winsocket->state_mu);
+ if (winsocket->shutdown_called) {
+ gpr_mu_unlock(&winsocket->state_mu);
+ return;
+ }
+ winsocket->shutdown_called = true;
+ gpr_mu_unlock(&winsocket->state_mu);
+
status = WSAIoctl(winsocket->socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
&ioctl_num_bytes, NULL, NULL);
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index 490d0e0a06..a3875ce16c 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -87,6 +87,7 @@ typedef struct grpc_winsocket {
grpc_winsocket_callback_info read_info;
gpr_mu state_mu;
+ bool shutdown_called;
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 4d1e809872..1127588ebc 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -107,18 +107,22 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
gpr_mu_lock(&ac->mu);
- if (error == GRPC_ERROR_NONE && socket != NULL) {
- DWORD transfered_bytes = 0;
- DWORD flags;
- BOOL wsa_success =
- WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
- &transfered_bytes, FALSE, &flags);
- GPR_ASSERT(transfered_bytes == 0);
- if (!wsa_success) {
- error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
+ if (error == GRPC_ERROR_NONE) {
+ if (socket != NULL) {
+ DWORD transfered_bytes = 0;
+ DWORD flags;
+ BOOL wsa_success =
+ WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
+ &transfered_bytes, FALSE, &flags);
+ GPR_ASSERT(transfered_bytes == 0);
+ if (!wsa_success) {
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
+ } else {
+ *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
+ socket = NULL;
+ }
} else {
- *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
- socket = NULL;
+ error = GRPC_ERROR_CREATE("socket is null");
}
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index ae54c70d2d..b8a391c059 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -73,6 +73,7 @@ struct grpc_tcp_listener {
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
+ int outstanding_calls;
/* closure for socket notification of accept being ready */
grpc_closure on_accept;
/* linked list */
@@ -140,10 +141,9 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
- }
+static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_tcp_server *s = arg;
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
@@ -159,6 +159,16 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s);
}
+static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ }
+
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s),
+ GRPC_ERROR_NONE, NULL);
+}
+
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
gpr_ref_non_zero(&s->refs);
return s;
@@ -180,17 +190,14 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) {
- immediately_done = 1;
- }
- for (sp = s->head; sp; sp = sp->next) {
- sp->shutting_down = 1;
- grpc_winsocket_shutdown(sp->socket);
+ finish_shutdown_locked(exec_ctx, s);
+ } else {
+ for (sp = s->head; sp; sp = sp->next) {
+ sp->shutting_down = 1;
+ grpc_winsocket_shutdown(sp->socket);
+ }
}
gpr_mu_unlock(&s->mu);
-
- if (immediately_done) {
- finish_shutdown(exec_ctx, s);
- }
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
@@ -251,31 +258,30 @@ failure:
return error;
}
-static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *sp) {
+static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *sp) {
int notify = 0;
sp->shutting_down = 0;
- gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) {
- notify = 1;
- }
- gpr_mu_unlock(&sp->server->mu);
- if (notify) {
- finish_shutdown(exec_ctx, sp->server);
+ finish_shutdown_locked(exec_ctx, sp->server);
}
}
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
-static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *port) {
+static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
DWORD addrlen = sizeof(struct sockaddr_in6) + 16;
DWORD bytes_received = 0;
grpc_error *error = GRPC_ERROR_NONE;
+ if (port->shutting_down) {
+ return GRPC_ERROR_NONE;
+ }
+
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
@@ -305,20 +311,11 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
+ port->outstanding_calls++;
return error;
failure:
GPR_ASSERT(error != GRPC_ERROR_NONE);
- if (port->shutting_down) {
- /* We are abandoning the listener port, take that into account to prevent
- occasional hangs on shutdown. The hang happens when sp->shutting_down
- change is not seen by on_accept and we proceed to trying new accept,
- but we fail there because the listening port has been closed in the
- meantime. */
- decrement_active_ports_and_notify(exec_ctx, port);
- GRPC_ERROR_UNREF(error);
- return GRPC_ERROR_NONE;
- }
if (sock != INVALID_SOCKET) closesocket(sock);
return error;
}
@@ -338,6 +335,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
BOOL wsa_success;
int err;
+ gpr_mu_lock(&sp->server->mu);
+
peer_name.len = sizeof(struct sockaddr_storage);
/* The general mechanism for shutting down is to queue abortion calls. While
@@ -347,6 +346,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
const char *msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
grpc_error_free_string(msg);
+ gpr_mu_unlock(&sp->server->mu);
return;
}
@@ -356,17 +356,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
if (!wsa_success) {
- if (sp->shutting_down) {
- /* During the shutdown case, we ARE expecting an error. So that's well,
- and we can wake up the shutdown thread. */
- decrement_active_ports_and_notify(exec_ctx, sp);
- return;
- } else {
+ if (!sp->shutting_down) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message);
- closesocket(sock);
}
+ closesocket(sock);
} else {
if (!sp->shutting_down) {
peer_name_string = NULL;
@@ -408,7 +403,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
+ if (0 == --sp->outstanding_calls) {
+ decrement_active_ports_and_notify_locked(exec_ctx, sp);
+ }
+ gpr_mu_unlock(&sp->server->mu);
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@@ -456,6 +456,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
+ sp->outstanding_calls = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
sp->port = port;
@@ -553,7 +554,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (sp = s->head; sp; sp = sp->next) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index fd0c7a0f9d..3c24ea9afa 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -388,7 +388,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
/* Try listening on IPv6 first. */
addr = &wild6;
// TODO(rjshade): Test and propagate the returned grpc_error*:
- grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
+ &dsmode, &fd));
allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
@@ -402,7 +403,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
}
// TODO(rjshade): Test and propagate the returned grpc_error*:
- grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
+ &dsmode, &fd));
if (fd < 0) {
gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
}
diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c
index daf371d03e..4247a1c12b 100644
--- a/src/core/lib/support/subprocess_posix.c
+++ b/src/core/lib/support/subprocess_posix.c
@@ -98,7 +98,8 @@ retry:
if (errno == EINTR) {
goto retry;
}
- gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno));
+ gpr_log(GPR_ERROR, "waitpid failed for pid %d: %s", p->pid,
+ strerror(errno));
return -1;
}
p->joined = true;
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 89072879d9..4f49d7cf7d 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -100,7 +100,12 @@ grpc_connectivity_state grpc_connectivity_state_check(
return tracker->current_state;
}
-int grpc_connectivity_state_notify_on_state_change(
+bool grpc_connectivity_state_has_watchers(
+ grpc_connectivity_state_tracker *connectivity_state) {
+ return connectivity_state->watchers != NULL;
+}
+
+bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify) {
if (grpc_connectivity_state_trace) {
@@ -119,7 +124,7 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
tracker->watchers = w->next;
gpr_free(w);
- return 0;
+ return false;
}
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
@@ -127,11 +132,11 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
w->next = w->next->next;
gpr_free(rm_candidate);
- return 0;
+ return false;
}
w = w->next;
}
- return 0;
+ return false;
} else {
if (tracker->current_state != *current) {
*current = tracker->current_state;
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 7a2fa52c10..769c675b79 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -75,13 +75,16 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_error *associated_error,
const char *reason);
+bool grpc_connectivity_state_has_watchers(
+ grpc_connectivity_state_tracker *tracker);
+
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker, grpc_error **current_error);
/** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
case) */
-int grpc_connectivity_state_notify_on_state_change(
+bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify);
diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile
index 17478fab12..5785b976f2 100644
--- a/src/objective-c/tests/Podfile
+++ b/src/objective-c/tests/Podfile
@@ -84,9 +84,9 @@ post_install do |installer|
end
# CocoaPods creates duplicated library targets of gRPC-Core when the test targets include
- # non-default subspecs of gRPC-Core. All of these library targets start with prefix 'gRPC-Core.'
+ # non-default subspecs of gRPC-Core. All of these library targets start with prefix 'gRPC-Core'
# and require the same error suppresion.
- if target.name == 'gRPC-Core' or target.name.start_with?('gRPC-Core.')
+ if target.name.start_with?('gRPC-Core')
target.build_configurations.each do |config|
# TODO(zyc): Remove this setting after the issue is resolved
# GPR_UNREACHABLE_CODE causes "Control may reach end of non-void
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index 57a53ca11e..fcdcaba6a2 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -523,9 +523,8 @@ static void perform_request(client_fixture *cf) {
CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
cq_verify(cqv);
+ gpr_log(GPR_INFO, "Client after sending msg %d / 4", i + 1);
GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD));
- GPR_ASSERT(grpc_channel_check_connectivity_state(
- cf->client, 0 /* try to connect */) == GRPC_CHANNEL_READY);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload_recv);
@@ -546,16 +545,17 @@ static void perform_request(client_fixture *cf) {
cq_verify(cqv);
peer = grpc_call_get_peer(c);
gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer);
- gpr_free(peer);
grpc_call_destroy(c);
- cq_verify_empty_timeout(cqv, 1);
+ cq_verify_empty_timeout(cqv, 1 /* seconds */);
cq_verifier_destroy(cqv);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
gpr_free(details);
+ gpr_log(GPR_INFO, "Client call (peer %s) DESTROYED.", peer);
+ gpr_free(peer);
}
static void setup_client(const char *server_hostport, client_fixture *cf) {
@@ -699,39 +699,42 @@ static test_fixture test_update(int lb_server_update_delay_ms) {
TEST(GrpclbTest, Updates) {
grpc::test_fixture tf_result;
- // Clients take a bit over one second to complete a call (the last part of the
+ // Clients take at least one second to complete a call (the last part of the
// call sleeps for 1 second while verifying the client's completion queue is
- // empty). Therefore:
+ // empty), more if the system is under load. Therefore:
//
// If the LB server waits 800ms before sending an update, it will arrive
- // before the first client request is done, skipping the second server from
- // batch 1 altogether: the 2nd client request will go to the 1st server of
- // batch 2 (ie, the third one out of the four total servers).
+ // before the first client request finishes, skipping the second server from
+ // batch 1. All subsequent picks will come from the second half of the
+ // backends, those coming in the LB update.
tf_result = grpc::test_update(800);
GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0);
- GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2);
- GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced +
+ tf_result.lb_backends[3].num_calls_serviced >
+ 0);
+ int num_serviced_calls = 0;
+ for (int i = 0; i < 4; i++) {
+ num_serviced_calls += tf_result.lb_backends[i].num_calls_serviced;
+ }
+ GPR_ASSERT(num_serviced_calls == 4);
- // If the LB server waits 1500ms, the update arrives after having picked the
- // 2nd server from batch 1 but before the next pick for the first server of
- // batch 2. All server are used.
- tf_result = grpc::test_update(1500);
- GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
- GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
- GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
- GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
-
- // If the LB server waits > 2000ms, the update arrives after the first two
- // request are done and the third pick is performed, which returns, in RR
- // fashion, the 1st server of the 1st update. Therefore, the second server of
- // batch 1 is hit at least one, whereas the first server of batch 2 is never
- // hit.
+ // If the LB server waits 2500ms, the update arrives after two calls and three
+ // picks. The third pick will be the 1st server of the 1st update (RR policy
+ // going around). The fourth and final pick will come from the second LB
+ // update. In any case, the total number of serviced calls must again be equal
+ // to four across all the backends.
tf_result = grpc::test_update(2500);
GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced >= 1);
- GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0);
- GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0);
- GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0);
+ GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced +
+ tf_result.lb_backends[3].num_calls_serviced >
+ 0);
+ num_serviced_calls = 0;
+ for (int i = 0; i < 4; i++) {
+ num_serviced_calls += tf_result.lb_backends[i].num_calls_serviced;
+ }
+ GPR_ASSERT(num_serviced_calls == 4);
}
TEST(GrpclbTest, InvalidAddressInServerlist) {}
diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc
index 106509ab83..b7b2553f12 100644
--- a/test/cpp/qps/json_run_localhost.cc
+++ b/test/cpp/qps/json_run_localhost.cc
@@ -31,7 +31,11 @@
*
*/
+#include <signal.h>
+#include <string.h>
+
#include <memory>
+#include <mutex>
#include <sstream>
#include <string>
@@ -43,6 +47,11 @@
using grpc::SubProcess;
+constexpr auto kNumWorkers = 2;
+
+static SubProcess* g_driver;
+static SubProcess* g_workers[kNumWorkers];
+
template <class T>
std::string as_string(const T& val) {
std::ostringstream out;
@@ -50,6 +59,24 @@ std::string as_string(const T& val) {
return out.str();
}
+static void sighandler(int sig) {
+ const int errno_saved = errno;
+ if (g_driver != NULL) g_driver->Interrupt();
+ for (int i = 0; i < kNumWorkers; ++i) {
+ if (g_workers[i]) g_workers[i]->Interrupt();
+ }
+ errno = errno_saved;
+}
+
+static void register_sighandler() {
+ struct sigaction act;
+ memset(&act, 0, sizeof(act));
+ act.sa_handler = sighandler;
+
+ sigaction(SIGINT, &act, NULL);
+ sigaction(SIGTERM, &act, NULL);
+}
+
static void LogStatus(int status, const char* label) {
if (WIFEXITED(status)) {
gpr_log(GPR_INFO, "%s: subprocess exited with status %d", label,
@@ -63,8 +90,7 @@ static void LogStatus(int status, const char* label) {
}
int main(int argc, char** argv) {
- typedef std::unique_ptr<SubProcess> SubProcessPtr;
- std::vector<SubProcessPtr> jobs;
+ register_sighandler();
std::string my_bin = argv[0];
std::string bin_dir = my_bin.substr(0, my_bin.rfind('/'));
@@ -72,11 +98,11 @@ int main(int argc, char** argv) {
std::ostringstream env;
bool first = true;
- for (int i = 0; i < 2; i++) {
- auto port = grpc_pick_unused_port_or_die();
+ for (int i = 0; i < kNumWorkers; i++) {
+ const auto port = grpc_pick_unused_port_or_die();
std::vector<std::string> args = {bin_dir + "/qps_worker", "-driver_port",
as_string(port)};
- jobs.emplace_back(new SubProcess(args));
+ g_workers[i] = new SubProcess(args);
if (!first) env << ",";
env << "localhost:" << port;
first = false;
@@ -87,18 +113,27 @@ int main(int argc, char** argv) {
for (int i = 1; i < argc; i++) {
args.push_back(argv[i]);
}
- int status = SubProcess(args).Join();
- if (status != 0) {
- LogStatus(status, "driver");
- }
- for (auto it = jobs.begin(); it != jobs.end(); ++it) {
- (*it)->Interrupt();
+ g_driver = new SubProcess(args);
+ const int driver_join_status = g_driver->Join();
+ if (driver_join_status != 0) {
+ LogStatus(driver_join_status, "driver");
}
- for (auto it = jobs.begin(); it != jobs.end(); ++it) {
- status = (*it)->Join();
- if (status != 0) {
- LogStatus(status, "worker");
+ for (int i = 0; i < kNumWorkers; ++i) {
+ if (g_workers[i]) g_workers[i]->Interrupt();
+ }
+
+ for (int i = 0; i < kNumWorkers; ++i) {
+ if (g_workers[i]) {
+ const int worker_status = g_workers[i]->Join();
+ if (worker_status != 0) {
+ LogStatus(worker_status, "worker");
+ }
}
}
+
+ delete g_driver;
+ g_driver = NULL;
+ for (int i = 0; i < kNumWorkers; ++i) delete g_workers[i];
+ GPR_ASSERT(driver_join_status == 0);
}
diff --git a/tools/README.md b/tools/README.md
index d142d4aee2..d051846c33 100644
--- a/tools/README.md
+++ b/tools/README.md
@@ -11,6 +11,8 @@ gce: Scripts to help setup testing infrastructure on GCE.
gcp: Helper scripts for interacting with various services on GCP (like Google
container engine, BigQuery etc)
+internal_ci: Support for running tests on an internal CI platform.
+
jenkins: Support for running tests on Jenkins.
run_tests: Scripts to run gRPC tests in parallel.
diff --git a/tools/internal_ci/README.md b/tools/internal_ci/README.md
new file mode 100644
index 0000000000..8bed6ca782
--- /dev/null
+++ b/tools/internal_ci/README.md
@@ -0,0 +1,6 @@
+#Internal continuous integration
+
+gRPC's externally facing testing is managed by Jenkins CI (see `tools/jenkins`
+directory). Nevertheless, some of the tests are better suited for being run
+on internal infrastructure and using an internal CI system. Configuration for
+such tests is under this directory.
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/linux/grpc_master.cfg
new file mode 100644
index 0000000000..1f81660078
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_master.cfg
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/run_tests.sh"
diff --git a/tools/internal_ci/linux/run_tests.sh b/tools/internal_ci/linux/run_tests.sh
new file mode 100755
index 0000000000..be477c1271
--- /dev/null
+++ b/tools/internal_ci/linux/run_tests.sh
@@ -0,0 +1,45 @@
+#!/bin/bash
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+set -ex
+
+# change to grpc repo root
+cd $(dirname $0)/../../..
+
+# TODO(jtattermusch): get rid of the system inspection eventually
+nproc || true
+lsb_release -dc || true
+gcc --version || true
+clang --version || true
+docker --version || true
+
+git submodule update --init
+
+tools/run_tests/run_tests.py -l c --build_only
diff --git a/tools/jenkins/README.md b/tools/jenkins/README.md
index 8e06b68466..02f63f0f4a 100644
--- a/tools/jenkins/README.md
+++ b/tools/jenkins/README.md
@@ -1 +1,6 @@
+# Jenkins CI scripts
+
Scripts invoked by Jenkins (our CI platform) to run gRPC test suites.
+We run a comprehensive set of tests (unit, integration, interop,
+performance, portability..) on each pull request and also periodically on
+`master` and release branches.