diff options
author | Yang Gao <yangg@google.com> | 2015-12-11 10:32:43 -0800 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-12-11 10:32:43 -0800 |
commit | 12fa8c83aff22c84ee92ea00c79b2f6236c93d26 (patch) | |
tree | 2c76c2ff5dacb0c5ffde91a91b87094fdd4bc79c /src/core | |
parent | aa1ebffb323e012167d080c8375097e1bd290fc4 (diff) | |
parent | aae3b8cddcdaa878255eed81faeab4c3940ad7f7 (diff) |
Merge pull request #4325 from ctiller/ping-ping-ping-ping-ping-ping-ping-ping-ping
Ping support for channels
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 15 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 14 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 18 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 5 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 6 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 11 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 3 | ||||
-rw-r--r-- | src/core/surface/channel_ping.c | 79 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_ping.c | 11 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 7 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 20 |
11 files changed, 173 insertions, 16 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9f993b39d6..385ae3be9b 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -252,7 +252,10 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); GPR_ASSERT(op->set_accept_stream == NULL); - GPR_ASSERT(op->bind_pollset == NULL); + if (op->bind_pollset != NULL) { + grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, + op->bind_pollset); + } gpr_mu_lock(&chand->mu_config); if (op->on_connectivity_state_change != NULL) { @@ -263,6 +266,16 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->connectivity_state = NULL; } + if (op->send_ping != NULL) { + if (chand->lb_policy == NULL) { + grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, 0); + } else { + grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); + op->bind_pollset = NULL; + } + op->send_ping = NULL; + } + if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index b91f0609d2..37de3e9f68 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -348,8 +348,20 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); } +void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (p->selected) { + grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + } else { + grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + } + gpr_mu_unlock(&p->mu); +} + static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle, + pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index b86dba20ee..d487456363 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -467,8 +467,24 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&p->mu); } +static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + ready_list *selected; + grpc_connected_subchannel *target; + gpr_mu_lock(&p->mu); + if ((selected = peek_next_connected_locked(p))) { + gpr_mu_unlock(&p->mu); + target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + grpc_connected_subchannel_ping(exec_ctx, target, closure); + } else { + gpr_mu_unlock(&p->mu); + grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + } +} + static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, + rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index d254161546..d4672f6b25 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -116,6 +116,11 @@ void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { policy->vtable->exit_idle(exec_ctx, policy); } +void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_closure *closure) { + policy->vtable->ping_one(exec_ctx, policy, closure); +} + void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connectivity_state *state, diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 2f8d655558..db5238c8ca 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -63,6 +63,9 @@ struct grpc_lb_policy_vtable { void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); + void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_closure *closure); + /** try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -121,6 +124,9 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target, grpc_closure *on_complete); +void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_closure *closure); + void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6631e9bae2..afb1cdbd6d 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -461,6 +461,17 @@ void grpc_connected_subchannel_notify_on_state_change( closure); } +void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *con, + grpc_closure *closure) { + grpc_transport_op op; + grpc_channel_element *elem; + memset(&op, 0, sizeof(op)); + op.send_ping = closure; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); + elem->filter->start_transport_op(exec_ctx, elem, &op); +} + static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { size_t channel_stack_size; grpc_connected_subchannel *con; diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 74ebcecfba..57c7c9dc67 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -124,6 +124,9 @@ void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify); +void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *channel, + grpc_closure *notify); /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c new file mode 100644 index 0000000000..1b6f06ded1 --- /dev/null +++ b/src/core/surface/channel_ping.c @@ -0,0 +1,79 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/channel.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/surface/api_trace.h" +#include "src/core/surface/completion_queue.h" + +typedef struct { + grpc_closure closure; + void *tag; + grpc_completion_queue *cq; + grpc_cq_completion completion_storage; +} ping_result; + +static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *storage) { + gpr_free(arg); +} + +static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { + ping_result *pr = arg; + grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr, + &pr->completion_storage); +} + +void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, + void *tag, void *reserved) { + grpc_transport_op op; + ping_result *pr = gpr_malloc(sizeof(*pr)); + grpc_channel_element *top_elem = + grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_ASSERT(reserved == NULL); + memset(&op, 0, sizeof(op)); + pr->tag = tag; + pr->cq = cq; + grpc_closure_init(&pr->closure, ping_done, pr); + op.send_ping = &pr->closure; + op.bind_pollset = grpc_cq_pollset(cq); + grpc_cq_begin_op(cq); + top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op); + grpc_exec_ctx_finish(&exec_ctx); +} diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c index 4d2c54269d..8e763278ff 100644 --- a/src/core/transport/chttp2/frame_ping.c +++ b/src/core/transport/chttp2/frame_ping.c @@ -76,7 +76,6 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; grpc_chttp2_ping_parser *p = parser; - grpc_chttp2_outstanding_ping *ping; while (p->byte != 8 && cur != end) { p->opaque_8bytes[p->byte] = *cur; @@ -87,15 +86,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( if (p->byte == 8) { GPR_ASSERT(is_last); if (p->is_ack) { - for (ping = transport_parsing->pings.next; - ping != &transport_parsing->pings; ping = ping->next) { - if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1); - } - ping->next->prev = ping->prev; - ping->prev->next = ping->next; - gpr_free(ping); - } + grpc_chttp2_ack_ping(exec_ctx, transport_parsing, p->opaque_8bytes); } else { gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_ping_create(1, p->opaque_8bytes)); diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 45d2599cdc..fc35ea6f93 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -283,9 +283,6 @@ struct grpc_chttp2_transport_parsing { gpr_slice goaway_text; gpr_int64 outgoing_window; - - /** pings awaiting responses */ - grpc_chttp2_outstanding_ping pings; }; struct grpc_chttp2_transport { @@ -747,4 +744,8 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs); +void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *parsing, + const gpr_uint8 *opaque_8bytes); + #endif diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 6ba9db8348..aa459c8bac 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -901,6 +901,26 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); } +void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing, + const gpr_uint8 *opaque_8bytes) { + grpc_chttp2_outstanding_ping *ping; + grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); + grpc_chttp2_transport_global *transport_global = &t->global; + lock(t); + for (ping = transport_global->pings.next; ping != &transport_global->pings; + ping = ping->next) { + if (0 == memcmp(opaque_8bytes, ping->id, 8)) { + grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1); + ping->next->prev = ping->prev; + ping->prev->next = ping->next; + gpr_free(ping); + break; + } + } + unlock(exec_ctx, t); +} + static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; |