diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_uchannel.c | 572 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.h | 70 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 47 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 11 | ||||
-rw-r--r-- | src/core/iomgr/closure.c | 10 | ||||
-rw-r--r-- | src/core/iomgr/closure.h | 9 | ||||
-rw-r--r-- | src/core/iomgr/executor.c | 148 | ||||
-rw-r--r-- | src/core/iomgr/executor.h | 53 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c | 22 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_windows.c | 26 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 62 | ||||
-rw-r--r-- | src/core/surface/init.c | 3 |
12 files changed, 971 insertions, 62 deletions
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c new file mode 100644 index 0000000000..510677a844 --- /dev/null +++ b/src/core/channel/client_uchannel.c @@ -0,0 +1,572 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/client_uchannel.h" + +#include <string.h> + +#include "src/core/census/grpc_filter.h" +#include "src/core/channel/channel_args.h" +#include "src/core/channel/client_channel.h" +#include "src/core/channel/compress_filter.h" +#include "src/core/iomgr/iomgr.h" +#include "src/core/support/string.h" +#include "src/core/surface/channel.h" +#include "src/core/transport/connectivity_state.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +/** Microchannel (uchannel) implementation: a lightweight channel without any + * load-balancing mechanisms meant for communication from within the core. */ + +typedef struct call_data call_data; + +typedef struct client_uchannel_channel_data { + /** metadata context for this channel */ + grpc_mdctx *mdctx; + + /** master channel - the grpc_channel instance that ultimately owns + this channel_data via its channel stack. + We occasionally use this to bump the refcount on the master channel + to keep ourselves alive through an asynchronous operation. */ + grpc_channel *master; + + /** connectivity state being tracked */ + grpc_connectivity_state_tracker state_tracker; + + /** the subchannel wrapped by the microchannel */ + grpc_subchannel *subchannel; + + /** the callback used to stay subscribed to subchannel connectivity + * notifications */ + grpc_closure connectivity_cb; + + /** the current connectivity state of the wrapped subchannel */ + grpc_connectivity_state subchannel_connectivity; + + gpr_mu mu_state; +} channel_data; + +typedef enum { + CALL_CREATED, + CALL_WAITING_FOR_SEND, + CALL_WAITING_FOR_CALL, + CALL_ACTIVE, + CALL_CANCELLED +} call_state; + +struct call_data { + /* owning element */ + grpc_call_element *elem; + + gpr_mu mu_state; + + call_state state; + gpr_timespec deadline; + grpc_closure async_setup_task; + grpc_transport_stream_op waiting_op; + /* our child call stack */ + grpc_subchannel_call *subchannel_call; + grpc_linked_mdelem status; + grpc_linked_mdelem details; +}; + +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) + GRPC_MUST_USE_RESULT; + +static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + if (op->send_ops) { + grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); + op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0); + } + if (op->recv_ops) { + char status[GPR_LTOA_MIN_BUFSIZE]; + grpc_metadata_batch mdb; + gpr_ltoa(GRPC_STATUS_CANCELLED, status); + calld->status.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); + calld->details.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb.list.head = &calld->status; + mdb.list.tail = &calld->details; + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_sopb_add_metadata(op->recv_ops, mdb); + *op->recv_state = GRPC_STREAM_CLOSED; + op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1); + } + if (op->on_consumed) { + op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0); + } +} + +typedef struct { + grpc_closure closure; + grpc_call_element *elem; +} waiting_call; + +static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op, + int continuation); + +static int is_empty(void *p, int len) { + char *ptr = p; + int i; + for (i = 0; i < len; i++) { + if (ptr[i] != 0) return 0; + } + return 1; +} + +static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + channel_data *chand = arg; + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, + chand->subchannel_connectivity, + "uchannel_monitor_subchannel"); + grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel, + &chand->subchannel_connectivity, + &chand->connectivity_cb); +} + +static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + call_data *calld = arg; + grpc_transport_stream_op op; + int have_waiting; + + if (calld->state == CALL_CANCELLED && iomgr_success == 0) { + have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); + } + } else if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op); + } else if (calld->state == CALL_WAITING_FOR_CALL) { + have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); + if (calld->subchannel_call != NULL) { + calld->state = CALL_ACTIVE; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, + &calld->waiting_op); + } + } else { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); + } + } + } else { + GPR_ASSERT(calld->state == CALL_CANCELLED); + gpr_mu_unlock(&calld->mu_state); + have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); + if (have_waiting) { + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); + } + } +} + +static void started_call(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + call_data *calld = arg; + gpr_mu_lock(&calld->mu_state); + started_call_locked(exec_ctx, arg, iomgr_success); +} + +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) { + call_data *calld = elem->call_data; + grpc_closure *consumed_op = NULL; + grpc_transport_stream_op *waiting_op = &calld->waiting_op; + GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); + GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); + if (new_op->send_ops != NULL) { + waiting_op->send_ops = new_op->send_ops; + waiting_op->is_last_send = new_op->is_last_send; + waiting_op->on_done_send = new_op->on_done_send; + } + if (new_op->recv_ops != NULL) { + waiting_op->recv_ops = new_op->recv_ops; + waiting_op->recv_state = new_op->recv_state; + waiting_op->on_done_recv = new_op->on_done_recv; + } + if (new_op->on_consumed != NULL) { + if (waiting_op->on_consumed != NULL) { + consumed_op = waiting_op->on_consumed; + } + waiting_op->on_consumed = new_op->on_consumed; + } + if (new_op->cancel_with_status != GRPC_STATUS_OK) { + waiting_op->cancel_with_status = new_op->cancel_with_status; + } + return consumed_op; +} + +static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_subchannel_call *subchannel_call; + char *result; + + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_ACTIVE) { + subchannel_call = calld->subchannel_call; + GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); + gpr_mu_unlock(&calld->mu_state); + result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer"); + return result; + } else { + gpr_mu_unlock(&calld->mu_state); + return grpc_channel_get_target(chand->master); + } +} + +static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op, + int continuation) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_subchannel_call *subchannel_call; + grpc_transport_stream_op op2; + GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + gpr_mu_lock(&calld->mu_state); + /* make sure the wrapped subchannel has been set (see + * grpc_client_uchannel_set_subchannel) */ + GPR_ASSERT(chand->subchannel != NULL); + + switch (calld->state) { + case CALL_ACTIVE: + GPR_ASSERT(!continuation); + subchannel_call = calld->subchannel_call; + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op); + break; + case CALL_CANCELLED: + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + break; + case CALL_WAITING_FOR_SEND: + GPR_ASSERT(!continuation); + grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); + if (!calld->waiting_op.send_ops && + calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { + gpr_mu_unlock(&calld->mu_state); + break; + } + *op = calld->waiting_op; + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); + continuation = 1; + /* fall through */ + case CALL_WAITING_FOR_CALL: + if (!continuation) { + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + op2 = calld->waiting_op; + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); + if (op->on_consumed) { + calld->waiting_op.on_consumed = op->on_consumed; + op->on_consumed = NULL; + } else if (op2.on_consumed) { + calld->waiting_op.on_consumed = op2.on_consumed; + op2.on_consumed = NULL; + } + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + handle_op_after_cancellation(exec_ctx, elem, &op2); + grpc_subchannel_cancel_waiting_call(exec_ctx, chand->subchannel, 1); + } else { + grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); + gpr_mu_unlock(&calld->mu_state); + } + break; + } + /* fall through */ + case CALL_CREATED: + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + } else { + calld->waiting_op = *op; + if (op->send_ops == NULL) { + calld->state = CALL_WAITING_FOR_SEND; + gpr_mu_unlock(&calld->mu_state); + } else { + grpc_subchannel_call_create_status call_creation_status; + grpc_pollset *pollset = calld->waiting_op.bind_pollset; + calld->state = CALL_WAITING_FOR_CALL; + grpc_closure_init(&calld->async_setup_task, started_call, calld); + call_creation_status = grpc_subchannel_create_call( + exec_ctx, chand->subchannel, pollset, &calld->subchannel_call, + &calld->async_setup_task); + if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) { + started_call_locked(exec_ctx, calld, 1); + } else { + gpr_mu_unlock(&calld->mu_state); + } + } + } + break; + } +} + +static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + perform_transport_stream_op(exec_ctx, elem, op, 0); +} + +static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + channel_data *chand = elem->channel_data; + + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + + GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->bind_pollset == NULL); + + if (op->on_connectivity_state_change != NULL) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, op->connectivity_state, + op->on_connectivity_state_change); + op->on_connectivity_state_change = NULL; + op->connectivity_state = NULL; + } + + if (op->disconnect) { + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + } +} + +/* Constructor for call_data */ +static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op) { + call_data *calld = elem->call_data; + memset(calld, 0, sizeof(call_data)); + + /* TODO(ctiller): is there something useful we can do here? */ + GPR_ASSERT(initial_op == NULL); + + GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); + GPR_ASSERT(server_transport_data == NULL); + gpr_mu_init(&calld->mu_state); + calld->elem = elem; + calld->state = CALL_CREATED; + calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); +} + +/* Destructor for call_data */ +static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = elem->call_data; + grpc_subchannel_call *subchannel_call; + + /* if the call got activated, we need to destroy the child stack also, and + remove it from the in-flight requests tracked by the child_entry we + picked */ + gpr_mu_lock(&calld->mu_state); + switch (calld->state) { + case CALL_ACTIVE: + subchannel_call = calld->subchannel_call; + gpr_mu_unlock(&calld->mu_state); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_uchannel"); + break; + case CALL_CREATED: + case CALL_CANCELLED: + gpr_mu_unlock(&calld->mu_state); + break; + case CALL_WAITING_FOR_CALL: + case CALL_WAITING_FOR_SEND: + GPR_UNREACHABLE_CODE(return ); + } +} + +/* Constructor for channel_data */ +static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel *master, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last) { + channel_data *chand = elem->channel_data; + memset(chand, 0, sizeof(*chand)); + grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); + GPR_ASSERT(is_last); + GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); + chand->mdctx = metadata_context; + chand->master = master; + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, + "client_uchannel"); + gpr_mu_init(&chand->mu_state); +} + +/* Destructor for channel_data */ +static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel, + &chand->connectivity_cb); + grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); + gpr_mu_destroy(&chand->mu_state); +} + +const grpc_channel_filter grpc_client_uchannel_filter = { + cuc_start_transport_stream_op, + cuc_start_transport_op, + sizeof(call_data), + cuc_init_call_elem, + cuc_destroy_call_elem, + sizeof(channel_data), + cuc_init_channel_elem, + cuc_destroy_channel_elem, + cuc_get_peer, + "client-uchannel", +}; + +grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { + channel_data *chand = elem->channel_data; + grpc_connectivity_state out; + out = grpc_connectivity_state_check(&chand->state_tracker); + gpr_mu_lock(&chand->mu_state); + if (out == GRPC_CHANNEL_IDLE && try_to_connect) { + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, + GRPC_CHANNEL_CONNECTING, + "uchannel_connecting_changed"); + chand->subchannel_connectivity = out; + grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel, + &chand->subchannel_connectivity, + &chand->connectivity_cb); + } + gpr_mu_unlock(&chand->mu_state); + return out; +} + +void grpc_client_uchannel_watch_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_connectivity_state *state, grpc_closure *on_complete) { + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->mu_state); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, state, on_complete); + gpr_mu_unlock(&chand->mu_state); +} + +grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + grpc_channel_element *parent_elem; + gpr_mu_lock(&chand->mu_state); + parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack( + grpc_subchannel_get_master(chand->subchannel))); + gpr_mu_unlock(&chand->mu_state); + return grpc_client_channel_get_connecting_pollset_set(parent_elem); +} + +void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_pollset *pollset) { + grpc_pollset_set *master_pollset_set = + grpc_client_uchannel_get_connecting_pollset_set(elem); + grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset); +} + +void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_pollset *pollset) { + grpc_pollset_set *master_pollset_set = + grpc_client_uchannel_get_connecting_pollset_set(elem); + grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset); +} + +grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, + grpc_channel_args *args) { + grpc_channel *channel = NULL; +#define MAX_FILTERS 3 + const grpc_channel_filter *filters[MAX_FILTERS]; + grpc_mdctx *mdctx = grpc_subchannel_get_mdctx(subchannel); + grpc_channel *master = grpc_subchannel_get_master(subchannel); + char *target = grpc_channel_get_target(master); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + size_t n = 0; + + grpc_mdctx_ref(mdctx); + if (grpc_channel_args_is_census_enabled(args)) { + filters[n++] = &grpc_client_census_filter; + } + filters[n++] = &grpc_compress_filter; + filters[n++] = &grpc_client_uchannel_filter; + GPR_ASSERT(n <= MAX_FILTERS); + + channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, n, + args, mdctx, 1); + + gpr_free(target); + return channel; +} + +void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel, + grpc_subchannel *subchannel) { + grpc_channel_element *elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel)); + channel_data *chand = elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); + gpr_mu_lock(&chand->mu_state); + chand->subchannel = subchannel; + gpr_mu_unlock(&chand->mu_state); +} diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h new file mode 100644 index 0000000000..dfe6695ae3 --- /dev/null +++ b/src/core/channel/client_uchannel.h @@ -0,0 +1,70 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H +#define GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H + +#include "src/core/channel/channel_stack.h" +#include "src/core/client_config/resolver.h" + +#define GRPC_MICROCHANNEL_SUBCHANNEL_ARG "grpc.microchannel_subchannel_key" + +/* A client microchannel (aka uchannel) is a channel wrapping a subchannel, for + * the purposes of lightweight RPC communications from within the core.*/ + +extern const grpc_channel_filter grpc_client_uchannel_filter; + +grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); + +void grpc_client_uchannel_watch_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_connectivity_state *state, grpc_closure *on_complete); + +grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( + grpc_channel_element *elem); + +void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *channel, + grpc_pollset *pollset); +void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *channel, + grpc_pollset *pollset); + +grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, + grpc_channel_args *args); + +void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel, + grpc_subchannel *subchannel); + +#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 095000ba4f..0401dd3868 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -312,6 +312,29 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } +void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + int iomgr_success) { + waiting_for_connect *w4c; + gpr_mu_lock(&subchannel->mu); + w4c = subchannel->waiting; + subchannel->waiting = NULL; + gpr_mu_unlock(&subchannel->mu); + while (w4c != NULL) { + waiting_for_connect *next = w4c->next; + grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, + w4c->pollset); + if (w4c->notify) { + w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); + } + + GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); + gpr_free(w4c); + + w4c = next; + } +} + static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; @@ -659,24 +682,12 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { iomgr_success = 0; } connectivity_state_changed_locked(exec_ctx, c, "alarm"); + gpr_mu_unlock(&c->mu); if (iomgr_success) { - gpr_mu_unlock(&c->mu); update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - waiting_for_connect *w4c; - w4c = c->waiting; - c->waiting = NULL; - gpr_mu_unlock(&c->mu); - while (w4c != NULL) { - waiting_for_connect *next = w4c->next; - grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, - w4c->pollset); - w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, 0); - GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); - gpr_free(w4c); - w4c = next; - } + grpc_subchannel_cancel_waiting_call(exec_ctx, c, iomgr_success); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); } @@ -784,3 +795,11 @@ static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk); return call; } + +grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel) { + return subchannel->mdctx; +} + +grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) { + return subchannel->master; +} diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index a26d08f02e..ec1cc7cc69 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -92,6 +92,11 @@ grpc_subchannel_call_create_status grpc_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, grpc_subchannel_call **target, grpc_closure *notify); +/** cancel \a call in the waiting state. */ +void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + int iomgr_success); + /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, @@ -154,4 +159,10 @@ struct grpc_subchannel_args { grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args); +/** Return the metadata context associated with the subchannel */ +grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel); + +/** Return the master channel associated with the subchannel */ +grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel); + #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index d91681990f..b4f1817de4 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -72,6 +72,16 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { src->head = src->tail = NULL; } +grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) { + grpc_closure *head; + if (list->head == NULL) { + return NULL; + } + head = list->head; + list->head = list->head->next; + return head; +} + typedef struct { grpc_iomgr_cb_func cb; void *cb_arg; diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h index d812659af0..7a9f7ccad0 100644 --- a/src/core/iomgr/closure.h +++ b/src/core/iomgr/closure.h @@ -83,9 +83,18 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } +/** add \a closure to the end of \a list and set \a closure's success to \a + * success */ void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, int success); + +/** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); + +/** pop (return and remove) the head closure from \a list. */ +grpc_closure *grpc_closure_list_pop(grpc_closure_list *list); + +/** return whether \a list is empty. */ int grpc_closure_list_empty(grpc_closure_list list); #endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */ diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c new file mode 100644 index 0000000000..457e5cdbac --- /dev/null +++ b/src/core/iomgr/executor.c @@ -0,0 +1,148 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/executor.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include "src/core/iomgr/exec_ctx.h" + +typedef struct grpc_executor_data { + int busy; /**< is the thread currently running? */ + int shutting_down; /**< has \a grpc_shutdown() been invoked? */ + int pending_join; /**< has the thread finished but not been joined? */ + grpc_closure_list closures; /**< collection of pending work */ + gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a + pending_join are true */ + gpr_thd_options options; + gpr_mu mu; +} grpc_executor; + +static grpc_executor g_executor; + +void grpc_executor_init() { + memset(&g_executor, 0, sizeof(grpc_executor)); + gpr_mu_init(&g_executor.mu); + g_executor.options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&g_executor.options); +} + +/* thread body */ +static void closure_exec_thread_func(void *ignored) { + grpc_closure *closure; + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + while (1) { + gpr_mu_lock(&g_executor.mu); + if (g_executor.shutting_down != 0) { + gpr_mu_unlock(&g_executor.mu); + break; + } + closure = grpc_closure_list_pop(&g_executor.closures); + if (closure == NULL) { + /* no more work, time to die */ + GPR_ASSERT(g_executor.busy == 1); + g_executor.busy = 0; + gpr_mu_unlock(&g_executor.mu); + break; + } + gpr_mu_unlock(&g_executor.mu); + closure->cb(&exec_ctx, closure->cb_arg, closure->success); + grpc_exec_ctx_flush(&exec_ctx); + } + grpc_exec_ctx_finish(&exec_ctx); +} + +/* Spawn the thread if new work has arrived a no thread is up */ +static void maybe_spawn_locked() { + if (grpc_closure_list_empty(g_executor.closures) == 1) { + return; + } + if (g_executor.shutting_down == 1) { + return; + } + + if (g_executor.busy != 0) { + /* Thread still working. New work will be picked up by already running + * thread. Not spawning anything. */ + return; + } else if (g_executor.pending_join != 0) { + /* Pickup the remains of the previous incarnations of the thread. */ + gpr_thd_join(g_executor.tid); + g_executor.pending_join = 0; + } + + /* All previous instances of the thread should have been joined at this point. + * Spawn time! */ + g_executor.busy = 1; + gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, + &g_executor.options); + g_executor.pending_join = 1; +} + +void grpc_executor_enqueue(grpc_closure *closure, int success) { + gpr_mu_lock(&g_executor.mu); + if (g_executor.shutting_down == 0) { + grpc_closure_list_add(&g_executor.closures, closure, success); + maybe_spawn_locked(); + } + gpr_mu_unlock(&g_executor.mu); +} + +void grpc_executor_shutdown() { + int pending_join; + grpc_closure *closure; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + gpr_mu_lock(&g_executor.mu); + pending_join = g_executor.pending_join; + g_executor.shutting_down = 1; + gpr_mu_unlock(&g_executor.mu); + /* we can release the lock at this point despite the access to the closure + * list below because we aren't accepting new work */ + + /* Execute pending callbacks, some may be performing cleanups */ + while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) { + closure->cb(&exec_ctx, closure->cb_arg, closure->success); + } + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); + if (pending_join) { + gpr_thd_join(g_executor.tid); + } + gpr_mu_destroy(&g_executor.mu); +} diff --git a/src/core/iomgr/executor.h b/src/core/iomgr/executor.h new file mode 100644 index 0000000000..6da446ae9c --- /dev/null +++ b/src/core/iomgr/executor.h @@ -0,0 +1,53 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H +#define GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H + +#include "src/core/iomgr/closure.h" + +/** Initialize the global executor. + * + * This mechanism is meant to outsource work (grpc_closure instances) to a + * thread, for those cases where blocking isn't an option but there isn't a + * non-blocking solution available. */ +void grpc_executor_init(); + +/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate + * thread */ +void grpc_executor_enqueue(grpc_closure *closure, int success); + +/** Shutdown the executor, running all pending work as part of the call */ +void grpc_executor_shutdown(); + +#endif /* GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index ed0a93fcc9..555c74ce7e 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -41,6 +41,7 @@ #include <sys/un.h> #include <string.h> +#include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/support/block_annotate.h" @@ -57,8 +58,8 @@ typedef struct { char *name; char *default_port; grpc_resolve_cb cb; + grpc_closure request_closure; void *arg; - grpc_iomgr_object iomgr_object; } request; grpc_resolved_addresses *grpc_blocking_resolve_address( @@ -149,20 +150,18 @@ done: return addrs; } -/* Thread function to asynch-ify grpc_blocking_resolve_address */ -static void do_request_thread(void *rp) { +/* Callback to be passed to grpc_executor to asynch-ify + * grpc_blocking_resolve_address */ +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) { request *r = rp; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); void *arg = r->arg; grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); - cb(&exec_ctx, arg, resolved); - grpc_iomgr_unregister_object(&r->iomgr_object); + cb(exec_ctx, arg, resolved); gpr_free(r); - grpc_exec_ctx_finish(&exec_ctx); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -173,17 +172,12 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); - gpr_thd_id id; - char *tmp; - gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name, - default_port); - grpc_iomgr_register_object(&r->iomgr_object, tmp); - gpr_free(tmp); + grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; r->arg = arg; - gpr_thd_new(&id, do_request_thread, r, NULL); + grpc_executor_enqueue(&r->request_closure, 1); } #endif diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 82a5602996..007c855d10 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -40,6 +40,7 @@ #include <sys/types.h> #include <string.h> +#include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/support/block_annotate.h" @@ -47,6 +48,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/log_win32.h> #include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> @@ -55,8 +57,8 @@ typedef struct { char *name; char *default_port; grpc_resolve_cb cb; + grpc_closure request_closure; void *arg; - grpc_iomgr_object iomgr_object; } request; grpc_resolved_addresses *grpc_blocking_resolve_address( @@ -93,7 +95,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( s = getaddrinfo(host, port, &hints, &result); GRPC_SCHEDULING_END_BLOCKING_REGION; if (s != 0) { - gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s)); + char *error_message = gpr_format_message(s); + gpr_log(GPR_ERROR, "getaddrinfo: %s", error_message); + gpr_free(error_message); goto done; } @@ -129,9 +133,9 @@ done: return addrs; } -/* Thread function to asynch-ify grpc_blocking_resolve_address */ -static void do_request(void *rp) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; +/* Callback to be passed to grpc_executor to asynch-ify + * grpc_blocking_resolve_address */ +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) { request *r = rp; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); @@ -139,10 +143,8 @@ static void do_request(void *rp) { grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); - grpc_iomgr_unregister_object(&r->iomgr_object); + cb(exec_ctx, arg, resolved); gpr_free(r); - cb(&exec_ctx, arg, resolved); - grpc_exec_ctx_finish(&exec_ctx); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -153,16 +155,12 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); - gpr_thd_id id; - char *label; - gpr_asprintf(&label, "resolve:%s", name); - grpc_iomgr_register_object(&r->iomgr_object, label); - gpr_free(label); + grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; r->arg = arg; - gpr_thd_new(&id, do_request, r, NULL); + grpc_executor_enqueue(&r->request_closure, 1); } #endif diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 1a2aef64ef..df2774b527 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -37,6 +37,7 @@ #include <grpc/support/log.h> #include "src/core/channel/client_channel.h" +#include "src/core/channel/client_uchannel.h" #include "src/core/iomgr/timer.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/completion_queue.h" @@ -51,18 +52,24 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( GRPC_API_TRACE( "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2, (channel, try_to_connect)); - if (client_channel_elem->filter != &grpc_client_channel_filter) { - gpr_log(GPR_ERROR, - "grpc_channel_check_connectivity_state called on something that is " - "not a client channel, but '%s'", - client_channel_elem->filter->name); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + state = grpc_client_channel_check_connectivity_state( + &exec_ctx, client_channel_elem, try_to_connect); grpc_exec_ctx_finish(&exec_ctx); - return GRPC_CHANNEL_FATAL_FAILURE; + return state; } - state = grpc_client_channel_check_connectivity_state( - &exec_ctx, client_channel_elem, try_to_connect); + if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + state = grpc_client_uchannel_check_connectivity_state( + &exec_ctx, client_channel_elem, try_to_connect); + grpc_exec_ctx_finish(&exec_ctx); + return state; + } + gpr_log(GPR_ERROR, + "grpc_channel_check_connectivity_state called on something that is " + "not a (u)client channel, but '%s'", + client_channel_elem->filter->name); grpc_exec_ctx_finish(&exec_ctx); - return state; + return GRPC_CHANNEL_FATAL_FAILURE; } typedef enum { @@ -87,7 +94,17 @@ typedef struct { } state_watcher; static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, "watch_connectivity"); + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, + "watch_channel_connectivity"); + } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, + "watch_uchannel_connectivity"); + } else { + abort(); + } gpr_mu_destroy(&w->mu); gpr_free(w); } @@ -125,8 +142,13 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, w->removed = 1; client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem, - grpc_cq_pollset(w->cq)); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq)); + } else { + grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq)); + } } gpr_mu_unlock(&w->mu); if (due_to_completion) { @@ -199,18 +221,18 @@ void grpc_channel_watch_connectivity_state( gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); - if (client_channel_elem->filter != &grpc_client_channel_filter) { - gpr_log(GPR_ERROR, - "grpc_channel_watch_connectivity_state called on something that is " - "not a client channel, but '%s'", - client_channel_elem->filter->name); - grpc_exec_ctx_enqueue(&exec_ctx, &w->on_complete, 1); - } else { - GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem, grpc_cq_pollset(cq)); grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, &w->state, &w->on_complete); + } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity"); + grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem, + grpc_cq_pollset(cq)); + grpc_client_uchannel_watch_connectivity_state( + &exec_ctx, client_channel_elem, &w->state, &w->on_complete); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 715c90a5e1..b2e66a830e 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -47,6 +47,7 @@ #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr.h" #include "src/core/profiling/timers.h" #include "src/core/surface/api_trace.h" @@ -108,6 +109,7 @@ void grpc_init(void) { grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); grpc_security_pre_init(); grpc_iomgr_init(); + grpc_executor_init(); grpc_tracer_init("GRPC_TRACE"); /* Only initialize census if noone else has. */ if (census_enabled() == CENSUS_FEATURE_NONE) { @@ -132,6 +134,7 @@ void grpc_shutdown(void) { gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { grpc_iomgr_shutdown(); + grpc_executor_shutdown(); census_shutdown(); gpr_timers_global_destroy(); grpc_tracer_shutdown(); |