diff options
author | Craig Tiller <ctiller@google.com> | 2016-04-05 12:44:04 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-04-05 12:44:04 -0700 |
commit | b112f3989fb5cd45b9e4b53093d0c3d5f4d98c97 (patch) | |
tree | 8ae3e20b6f5ca6880d179d216bfa72f3c79d8b55 /src/core/ext/client_config | |
parent | 52c85cecf365fb534827ac15780d803993693c46 (diff) |
Move subchannel_call_holder
Diffstat (limited to 'src/core/ext/client_config')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 2 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel_call_holder.c | 260 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel_call_holder.h | 98 |
3 files changed, 359 insertions, 1 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 3b2f7b8785..b1c26dc342 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -43,7 +43,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" -#include "src/core/lib/channel/subchannel_call_holder.h" +#include "src/core/ext/client_config/subchannel_call_holder.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c new file mode 100644 index 0000000000..3db462b246 --- /dev/null +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -0,0 +1,260 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_config/subchannel_call_holder.h" + +#include <grpc/support/alloc.h> + +#include "src/core/lib/profiling/timers.h" + +#define GET_CALL(holder) \ + ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call))) + +#define CANCELLED_CALL ((grpc_subchannel_call *)1) + +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder, + bool success); +static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args, + bool success); + +static void add_waiting_locked(grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op); +static void fail_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); +static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); + +void grpc_subchannel_call_holder_init( + grpc_subchannel_call_holder *holder, + grpc_subchannel_call_holder_pick_subchannel pick_subchannel, + void *pick_subchannel_arg, grpc_call_stack *owning_call) { + gpr_atm_rel_store(&holder->subchannel_call, 0); + holder->pick_subchannel = pick_subchannel; + holder->pick_subchannel_arg = pick_subchannel_arg; + gpr_mu_init(&holder->mu); + holder->connected_subchannel = NULL; + holder->waiting_ops = NULL; + holder->waiting_ops_count = 0; + holder->waiting_ops_capacity = 0; + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + holder->owning_call = owning_call; +} + +void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + grpc_subchannel_call *call = GET_CALL(holder); + if (call != NULL && call != CANCELLED_CALL) { + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder"); + } + GPR_ASSERT(holder->creation_phase == + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); + gpr_mu_destroy(&holder->mu); + GPR_ASSERT(holder->waiting_ops_count == 0); + gpr_free(holder->waiting_ops); +} + +void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op) { + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(holder); + GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + /* we failed; lock and figure out what to do */ + gpr_mu_lock(&holder->mu); +retry: + /* need to recheck that another thread hasn't set the call */ + call = GET_CALL(holder); + if (call == CANCELLED_CALL) { + gpr_mu_unlock(&holder->mu); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + if (call != NULL) { + gpr_mu_unlock(&holder->mu); + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + /* if this is a cancellation, then we can raise our cancelled flag */ + if (op->cancel_with_status != GRPC_STATUS_OK) { + if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) { + goto retry; + } else { + switch (holder->creation_phase) { + case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: + fail_locked(exec_ctx, holder); + break; + case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: + holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL, + 0, &holder->connected_subchannel, NULL); + break; + } + gpr_mu_unlock(&holder->mu); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + } + /* if we don't have a subchannel, try to get one */ + if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && + holder->connected_subchannel == NULL && + op->send_initial_metadata != NULL) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; + grpc_closure_init(&holder->next_step, subchannel_ready, holder); + GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel"); + if (holder->pick_subchannel( + exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata, + op->send_initial_metadata_flags, &holder->connected_subchannel, + &holder->next_step)) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); + } + } + /* if we've got a subchannel, then let's ask it to create a call */ + if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && + holder->connected_subchannel != NULL) { + gpr_atm_rel_store( + &holder->subchannel_call, + (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); + retry_waiting_locked(exec_ctx, holder); + goto retry; + } + /* nothing to be done but wait */ + add_waiting_locked(holder, op); + gpr_mu_unlock(&holder->mu); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); +} + +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + grpc_subchannel_call_holder *holder = arg; + gpr_mu_lock(&holder->mu); + GPR_ASSERT(holder->creation_phase == + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + if (holder->connected_subchannel == NULL) { + fail_locked(exec_ctx, holder); + } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) { + /* already cancelled before subchannel became ready */ + fail_locked(exec_ctx, holder); + } else { + gpr_atm_rel_store( + &holder->subchannel_call, + (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); + retry_waiting_locked(exec_ctx, holder); + } + gpr_mu_unlock(&holder->mu); + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); +} + +typedef struct { + grpc_transport_stream_op *ops; + size_t nops; + grpc_subchannel_call *call; +} retry_ops_args; + +static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + retry_ops_args *a = gpr_malloc(sizeof(*a)); + a->ops = holder->waiting_ops; + a->nops = holder->waiting_ops_count; + a->call = GET_CALL(holder); + if (a->call == CANCELLED_CALL) { + gpr_free(a); + fail_locked(exec_ctx, holder); + return; + } + holder->waiting_ops = NULL; + holder->waiting_ops_count = 0; + holder->waiting_ops_capacity = 0; + GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true, + NULL); +} + +static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) { + retry_ops_args *a = args; + size_t i; + for (i = 0; i < a->nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]); + } + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); + gpr_free(a->ops); + gpr_free(a); +} + +static void add_waiting_locked(grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("add_waiting_locked", 0); + if (holder->waiting_ops_count == holder->waiting_ops_capacity) { + holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity); + holder->waiting_ops = + gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity * + sizeof(*holder->waiting_ops)); + } + holder->waiting_ops[holder->waiting_ops_count++] = *op; + GPR_TIMER_END("add_waiting_locked", 0); +} + +static void fail_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + size_t i; + for (i = 0; i < holder->waiting_ops_count; i++) { + grpc_transport_stream_op_finish_with_failure(exec_ctx, + &holder->waiting_ops[i]); + } + holder->waiting_ops_count = 0; +} + +char *grpc_subchannel_call_holder_get_peer( + grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { + grpc_subchannel_call *subchannel_call = GET_CALL(holder); + + if (subchannel_call) { + return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + } else { + return NULL; + } +} diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h new file mode 100644 index 0000000000..2107a06cd9 --- /dev/null +++ b/src/core/ext/client_config/subchannel_call_holder.h @@ -0,0 +1,98 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H +#define GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H + +#include "src/core/ext/client_config/subchannel.h" + +/** Pick a subchannel for grpc_subchannel_call_holder; + Return 1 if subchannel is available immediately (in which case on_ready + should not be called), or 0 otherwise (in which case on_ready should be + called when the subchannel is available) */ +typedef int (*grpc_subchannel_call_holder_pick_subchannel)( + grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); + +typedef enum { + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL +} grpc_subchannel_call_holder_creation_phase; + +/** Wrapper for holding a pointer to grpc_subchannel_call, and the + associated machinery to create such a pointer. + Handles queueing of stream ops until a call object is ready, waiting + for initial metadata before trying to create a call object, + and handling cancellation gracefully. + + The channel filter uses this as their call_data. */ +typedef struct grpc_subchannel_call_holder { + /** either 0 for no call, 1 for cancelled, or a pointer to a + grpc_subchannel_call */ + gpr_atm subchannel_call; + /** Helper function to choose the subchannel on which to create + the call object. Channel filter delegates to the load + balancing policy (once it's ready). */ + grpc_subchannel_call_holder_pick_subchannel pick_subchannel; + void *pick_subchannel_arg; + + gpr_mu mu; + + grpc_subchannel_call_holder_creation_phase creation_phase; + grpc_connected_subchannel *connected_subchannel; + grpc_pollset *pollset; + + grpc_transport_stream_op *waiting_ops; + size_t waiting_ops_count; + size_t waiting_ops_capacity; + + grpc_closure next_step; + + grpc_call_stack *owning_call; +} grpc_subchannel_call_holder; + +void grpc_subchannel_call_holder_init( + grpc_subchannel_call_holder *holder, + grpc_subchannel_call_holder_pick_subchannel pick_subchannel, + void *pick_subchannel_arg, grpc_call_stack *owning_call); +void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); + +void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op); +char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); + +#endif /* GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H */ |