aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-12-11 10:32:43 -0800
committerGravatar Yang Gao <yangg@google.com>2015-12-11 10:32:43 -0800
commit12fa8c83aff22c84ee92ea00c79b2f6236c93d26 (patch)
tree2c76c2ff5dacb0c5ffde91a91b87094fdd4bc79c /src/core
parentaa1ebffb323e012167d080c8375097e1bd290fc4 (diff)
parentaae3b8cddcdaa878255eed81faeab4c3940ad7f7 (diff)
Merge pull request #4325 from ctiller/ping-ping-ping-ping-ping-ping-ping-ping-ping
Ping support for channels
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c15
-rw-r--r--src/core/client_config/lb_policies/pick_first.c14
-rw-r--r--src/core/client_config/lb_policies/round_robin.c18
-rw-r--r--src/core/client_config/lb_policy.c5
-rw-r--r--src/core/client_config/lb_policy.h6
-rw-r--r--src/core/client_config/subchannel.c11
-rw-r--r--src/core/client_config/subchannel.h3
-rw-r--r--src/core/surface/channel_ping.c79
-rw-r--r--src/core/transport/chttp2/frame_ping.c11
-rw-r--r--src/core/transport/chttp2/internal.h7
-rw-r--r--src/core/transport/chttp2_transport.c20
11 files changed, 173 insertions, 16 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9f993b39d6..385ae3be9b 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -252,7 +252,10 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
GPR_ASSERT(op->set_accept_stream == NULL);
- GPR_ASSERT(op->bind_pollset == NULL);
+ if (op->bind_pollset != NULL) {
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties,
+ op->bind_pollset);
+ }
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
@@ -263,6 +266,16 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->connectivity_state = NULL;
}
+ if (op->send_ping != NULL) {
+ if (chand->lb_policy == NULL) {
+ grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, 0);
+ } else {
+ grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
+ op->bind_pollset = NULL;
+ }
+ op->send_ping = NULL;
+ }
+
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index b91f0609d2..37de3e9f68 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -348,8 +348,20 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ gpr_mu_lock(&p->mu);
+ if (p->selected) {
+ grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
+ } else {
+ grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle,
+ pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle,
pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index b86dba20ee..d487456363 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -467,8 +467,24 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&p->mu);
}
+static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ ready_list *selected;
+ grpc_connected_subchannel *target;
+ gpr_mu_lock(&p->mu);
+ if ((selected = peek_next_connected_locked(p))) {
+ gpr_mu_unlock(&p->mu);
+ target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ grpc_connected_subchannel_ping(exec_ctx, target, closure);
+ } else {
+ gpr_mu_unlock(&p->mu);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ }
+}
+
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle,
+ rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle,
rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index d254161546..d4672f6b25 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -116,6 +116,11 @@ void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
+void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_closure *closure) {
+ policy->vtable->ping_one(exec_ctx, policy, closure);
+}
+
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 2f8d655558..db5238c8ca 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -63,6 +63,9 @@ struct grpc_lb_policy_vtable {
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+ void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_closure *closure);
+
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
@@ -121,6 +124,9 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
+void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_closure *closure);
+
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 6631e9bae2..afb1cdbd6d 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -461,6 +461,17 @@ void grpc_connected_subchannel_notify_on_state_change(
closure);
}
+void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *con,
+ grpc_closure *closure) {
+ grpc_transport_op op;
+ grpc_channel_element *elem;
+ memset(&op, 0, sizeof(op));
+ op.send_ping = closure;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
+}
+
static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
size_t channel_stack_size;
grpc_connected_subchannel *con;
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 74ebcecfba..57c7c9dc67 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -124,6 +124,9 @@ void grpc_connected_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
grpc_closure *notify);
+void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *channel,
+ grpc_closure *notify);
/** retrieve the grpc_connected_subchannel - or NULL if called before
the subchannel becomes connected */
diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c
new file mode 100644
index 0000000000..1b6f06ded1
--- /dev/null
+++ b/src/core/surface/channel_ping.c
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/channel.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/surface/api_trace.h"
+#include "src/core/surface/completion_queue.h"
+
+typedef struct {
+ grpc_closure closure;
+ void *tag;
+ grpc_completion_queue *cq;
+ grpc_cq_completion completion_storage;
+} ping_result;
+
+static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_cq_completion *storage) {
+ gpr_free(arg);
+}
+
+static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ ping_result *pr = arg;
+ grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr,
+ &pr->completion_storage);
+}
+
+void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
+ void *tag, void *reserved) {
+ grpc_transport_op op;
+ ping_result *pr = gpr_malloc(sizeof(*pr));
+ grpc_channel_element *top_elem =
+ grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GPR_ASSERT(reserved == NULL);
+ memset(&op, 0, sizeof(op));
+ pr->tag = tag;
+ pr->cq = cq;
+ grpc_closure_init(&pr->closure, ping_done, pr);
+ op.send_ping = &pr->closure;
+ op.bind_pollset = grpc_cq_pollset(cq);
+ grpc_cq_begin_op(cq);
+ top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c
index 4d2c54269d..8e763278ff 100644
--- a/src/core/transport/chttp2/frame_ping.c
+++ b/src/core/transport/chttp2/frame_ping.c
@@ -76,7 +76,6 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_ping_parser *p = parser;
- grpc_chttp2_outstanding_ping *ping;
while (p->byte != 8 && cur != end) {
p->opaque_8bytes[p->byte] = *cur;
@@ -87,15 +86,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
if (p->byte == 8) {
GPR_ASSERT(is_last);
if (p->is_ack) {
- for (ping = transport_parsing->pings.next;
- ping != &transport_parsing->pings; ping = ping->next) {
- if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) {
- grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1);
- }
- ping->next->prev = ping->prev;
- ping->prev->next = ping->next;
- gpr_free(ping);
- }
+ grpc_chttp2_ack_ping(exec_ctx, transport_parsing, p->opaque_8bytes);
} else {
gpr_slice_buffer_add(&transport_parsing->qbuf,
grpc_chttp2_ping_create(1, p->opaque_8bytes));
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 45d2599cdc..fc35ea6f93 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -283,9 +283,6 @@ struct grpc_chttp2_transport_parsing {
gpr_slice goaway_text;
gpr_int64 outgoing_window;
-
- /** pings awaiting responses */
- grpc_chttp2_outstanding_ping pings;
};
struct grpc_chttp2_transport {
@@ -747,4 +744,8 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs);
+void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *parsing,
+ const gpr_uint8 *opaque_8bytes);
+
#endif
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 6ba9db8348..aa459c8bac 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -901,6 +901,26 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
}
+void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ const gpr_uint8 *opaque_8bytes) {
+ grpc_chttp2_outstanding_ping *ping;
+ grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
+ grpc_chttp2_transport_global *transport_global = &t->global;
+ lock(t);
+ for (ping = transport_global->pings.next; ping != &transport_global->pings;
+ ping = ping->next) {
+ if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
+ grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1);
+ ping->next->prev = ping->prev;
+ ping->prev->next = ping->next;
+ gpr_free(ping);
+ break;
+ }
+ }
+ unlock(exec_ctx, t);
+}
+
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;