diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/channel_connectivity.c | 191 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 10 | ||||
-rw-r--r-- | src/core/surface/init.c | 2 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 10 |
4 files changed, 209 insertions, 4 deletions
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c new file mode 100644 index 0000000000..b6ea86d730 --- /dev/null +++ b/src/core/surface/channel_connectivity.c @@ -0,0 +1,191 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/channel.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/channel/client_channel.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/surface/completion_queue.h" + +grpc_connectivity_state grpc_channel_check_connectivity_state( + grpc_channel *channel, int try_to_connect) { + /* forward through to the underlying client channel */ + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + if (client_channel_elem->filter != &grpc_client_channel_filter) { + gpr_log(GPR_ERROR, + "grpc_channel_check_connectivity_state called on something that is " + "not a client channel, but '%s'", + client_channel_elem->filter->name); + return GRPC_CHANNEL_FATAL_FAILURE; + } + return grpc_client_channel_check_connectivity_state(client_channel_elem, + try_to_connect); +} + +typedef enum { + WAITING, + CALLING_BACK, + CALLING_BACK_AND_FINISHED, + CALLED_BACK +} callback_phase; + +typedef struct { + gpr_mu mu; + callback_phase phase; + int success; + grpc_iomgr_closure on_complete; + grpc_alarm alarm; + grpc_connectivity_state state; + grpc_connectivity_state *optional_new_state; + grpc_completion_queue *cq; + grpc_cq_completion completion_storage; + grpc_channel *channel; + void *tag; +} state_watcher; + +static void delete_state_watcher(state_watcher *w) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_del_interested_party(client_channel_elem, grpc_cq_pollset(w->cq)); + GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity"); + gpr_mu_destroy(&w->mu); + gpr_free(w); +} + +static void finished_completion(void *pw, grpc_cq_completion *ignored) { + int delete = 0; + state_watcher *w = pw; + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + case CALLED_BACK: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + break; + case CALLING_BACK: + w->phase = CALLED_BACK; + break; + case CALLING_BACK_AND_FINISHED: + delete = 1; + break; + } + gpr_mu_unlock(&w->mu); + + if (delete) { + delete_state_watcher(w); + } +} + +static void partly_done(state_watcher *w, int due_to_completion) { + int delete = 0; + + if (due_to_completion) { + gpr_mu_lock(&w->mu); + w->success = 1; + gpr_mu_unlock(&w->mu); + grpc_alarm_cancel(&w->alarm); + } + + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + w->phase = CALLING_BACK; + if (w->optional_new_state) { + *w->optional_new_state = w->state; + } + grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w, + &w->completion_storage); + break; + case CALLING_BACK: + w->phase = CALLING_BACK_AND_FINISHED; + break; + case CALLING_BACK_AND_FINISHED: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + break; + case CALLED_BACK: + delete = 1; + break; + } + gpr_mu_unlock(&w->mu); + + if (delete) { + delete_state_watcher(w); + } +} + +static void watch_complete(void *pw, int success) { partly_done(pw, 1); } + +static void timeout_complete(void *pw, int success) { partly_done(pw, 0); } + +void grpc_channel_watch_connectivity_state( + grpc_channel *channel, grpc_connectivity_state last_observed_state, + grpc_connectivity_state *optional_new_state, gpr_timespec deadline, + grpc_completion_queue *cq, void *tag) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + state_watcher *w = gpr_malloc(sizeof(*w)); + + grpc_cq_begin_op(cq); + + gpr_mu_init(&w->mu); + grpc_iomgr_closure_init(&w->on_complete, watch_complete, w); + w->phase = WAITING; + w->state = last_observed_state; + w->success = 0; + w->optional_new_state = optional_new_state; + w->cq = cq; + w->tag = tag; + w->channel = channel; + + grpc_alarm_init( + &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); + + if (client_channel_elem->filter != &grpc_client_channel_filter) { + gpr_log(GPR_ERROR, + "grpc_channel_watch_connectivity_state called on something that is " + "not a client channel, but '%s'", + client_channel_elem->filter->name); + grpc_iomgr_add_delayed_callback(&w->on_complete, 1); + } else { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); + grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq)); + grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, + &w->on_complete); + } +} diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 1fd1855c28..707d615688 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -109,6 +109,7 @@ typedef struct { gpr_refcount refs; grpc_mdctx *mdctx; grpc_channel_args *merge_args; + grpc_channel *master; } subchannel_factory; static void subchannel_factory_ref(grpc_subchannel_factory *scf) { @@ -119,6 +120,7 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) { static void subchannel_factory_unref(grpc_subchannel_factory *scf) { subchannel_factory *f = (subchannel_factory *)scf; if (gpr_unref(&f->refs)) { + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -137,6 +139,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; + args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base); grpc_channel_args_destroy(final_args); @@ -168,19 +171,22 @@ grpc_channel *grpc_insecure_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); + channel = + grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1); + f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); grpc_mdctx_ref(mdctx); f->mdctx = mdctx; f->merge_args = grpc_channel_args_copy(args); + f->master = channel; + GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } - channel = - grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 5cba479317..a015262612 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -47,6 +47,7 @@ #include "src/core/surface/init.h" #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" +#include "src/core/transport/connectivity_state.h" static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; @@ -75,6 +76,7 @@ void grpc_init(void) { grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("flowctl", &grpc_flowctl_trace); grpc_register_tracer("batch", &grpc_trace_batch); + grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); grpc_security_pre_init(); grpc_iomgr_init(); grpc_tracer_init("GRPC_TRACE"); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index a280311ba0..1f89353025 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -134,6 +134,7 @@ typedef struct { grpc_mdctx *mdctx; grpc_channel_args *merge_args; grpc_channel_security_connector *security_connector; + grpc_channel *master; } subchannel_factory; static void subchannel_factory_ref(grpc_subchannel_factory *scf) { @@ -146,6 +147,7 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) { if (gpr_unref(&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "subchannel_factory"); + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -165,6 +167,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; + args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base); grpc_channel_args_destroy(final_args); @@ -218,6 +221,9 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); + channel = + grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1); + f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); @@ -226,13 +232,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory"); f->security_connector = connector; f->merge_args = grpc_channel_args_copy(args_copy); + f->master = channel; + GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } - channel = - grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); |