aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_config/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_config/client_channel.c')
-rw-r--r--src/core/ext/client_config/client_channel.c123
1 files changed, 89 insertions, 34 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index fed3ae0450..c693c95a03 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -42,9 +42,11 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
@@ -63,6 +65,8 @@ typedef struct client_channel_channel_data {
grpc_resolver *resolver;
/** have we started resolving this channel */
bool started_resolving;
+ /** client channel factory */
+ grpc_client_channel_factory *client_channel_factory;
/** mutex protecting client configuration, including all
variables below in this data structure */
@@ -111,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
- /* check= */ 0);
+ /* check= */ 0, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@@ -173,20 +177,28 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
if (chand->resolver_result != NULL) {
- lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result);
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.server_name =
+ grpc_resolver_result_get_server_name(chand->resolver_result);
+ lb_policy_args.addresses =
+ grpc_resolver_result_get_addresses(chand->resolver_result);
+ lb_policy_args.additional_args =
+ grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy = grpc_lb_policy_create(
+ exec_ctx,
+ grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
+ &lb_policy_args);
if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error);
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
-
grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
+ chand->resolver_result = NULL;
}
- chand->resolver_result = NULL;
-
if (lb_policy != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
chand->interested_parties);
@@ -346,6 +358,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
+ if (chand->client_channel_factory != NULL) {
+ grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
+ }
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
@@ -377,6 +392,17 @@ typedef enum {
for initial metadata before trying to create a call object,
and handling cancellation gracefully. */
typedef struct client_channel_call_data {
+ // State for handling deadlines.
+ // The code in deadline_filter.c requires this to be the first field.
+ // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
+ // and this struct both independently store a pointer to the call
+ // stack and each has its own mutex. If/when we have time, find a way
+ // to avoid this without breaking the grpc_deadline_state abstraction.
+ grpc_deadline_state deadline_state;
+ gpr_timespec deadline;
+
+ grpc_error *cancel_error;
+
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
@@ -387,13 +413,15 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
- grpc_transport_stream_op *waiting_ops;
+ grpc_transport_stream_op **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
grpc_closure next_step;
grpc_call_stack *owning_call;
+
+ grpc_linked_mdelem lb_token_mdelem;
} call_data;
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
@@ -404,7 +432,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
gpr_realloc(calld->waiting_ops,
calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
}
- calld->waiting_ops[calld->waiting_ops_count++] = *op;
+ calld->waiting_ops[calld->waiting_ops_count++] = op;
GPR_TIMER_END("add_waiting_locked", 0);
}
@@ -413,14 +441,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
grpc_transport_stream_op_finish_with_failure(
- exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error));
+ exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
GRPC_ERROR_UNREF(error);
}
typedef struct {
- grpc_transport_stream_op *ops;
+ grpc_transport_stream_op **ops;
size_t nops;
grpc_subchannel_call *call;
} retry_ops_args;
@@ -429,7 +457,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
}
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
gpr_free(a->ops);
@@ -437,6 +465,10 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
}
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
+ if (calld->waiting_ops_count == 0) {
+ return;
+ }
+
retry_ops_args *a = gpr_malloc(sizeof(*a));
a->ops = calld->waiting_ops;
a->nops = calld->waiting_ops_count;
@@ -465,7 +497,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
"Failed to create subchannel", &error, 1));
- } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) {
+ } else if (GET_CALL(calld) == CANCELLED_CALL) {
/* already cancelled before subchannel became ready */
fail_locked(exec_ctx, calld,
GRPC_ERROR_CREATE_REFERENCING(
@@ -473,7 +505,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
} else {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent,
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
@@ -511,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
- grpc_closure *on_ready);
+ grpc_closure *on_ready, grpc_error *error);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
@@ -522,7 +554,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
} else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
- cpa->connected_subchannel, cpa->on_ready)) {
+ cpa->connected_subchannel, cpa->on_ready,
+ GRPC_ERROR_NONE)) {
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
}
gpr_free(cpa);
@@ -532,7 +565,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
- grpc_closure *on_ready) {
+ grpc_closure *on_ready, grpc_error *error) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data;
@@ -546,29 +579,34 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
- connected_subchannel);
+ connected_subchannel, GRPC_ERROR_REF(error));
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
- GRPC_ERROR_CREATE("Pick cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, cpa->on_ready,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
}
}
gpr_mu_unlock(&chand->mu);
GPR_TIMER_END("pick_subchannel", 0);
+ GRPC_ERROR_UNREF(error);
return true;
}
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
if (chand->lb_policy != NULL) {
grpc_lb_policy *lb_policy = chand->lb_policy;
int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
- initial_metadata, initial_metadata_flags,
- connected_subchannel, on_ready);
+ const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
+ initial_metadata_flags,
+ &calld->lb_token_mdelem};
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
+ NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return r;
@@ -610,12 +648,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -631,8 +670,8 @@ retry:
call = GET_CALL(calld);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&calld->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -648,18 +687,24 @@ retry:
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
goto retry;
} else {
+ // Stash a copy of cancel_error in our call data, so that we can use
+ // it for subsequent operations. This ensures that if the call is
+ // cancelled before any ops are passed down (e.g., if the deadline
+ // is in the past when the call starts), we can return the right
+ // error to the caller when the first op does get passed down.
+ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel,
- NULL);
+ NULL, GRPC_ERROR_REF(op->cancel_error));
break;
}
gpr_mu_unlock(&calld->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -673,7 +718,8 @@ retry:
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
- &calld->connected_subchannel, &calld->next_step)) {
+ &calld->connected_subchannel, &calld->next_step,
+ GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
}
@@ -683,7 +729,7 @@ retry:
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent,
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
@@ -706,6 +752,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
+ grpc_deadline_state_init(exec_ctx, elem, args);
+ calld->deadline = args->deadline;
+ calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu);
calld->connected_subchannel = NULL;
@@ -724,6 +773,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info,
void *and_free_memory) {
call_data *calld = elem->call_data;
+ grpc_deadline_state_destroy(exec_ctx, elem);
+ GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
@@ -760,10 +811,12 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
-void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- grpc_resolver *resolver) {
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory) {
/* post construction initialization: set the transport setup pointer */
+ GPR_ASSERT(client_channel_factory != NULL);
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
@@ -778,6 +831,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
// grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
// &chand->on_resolver_result_changed);
// }
+ chand->client_channel_factory = client_channel_factory;
+ grpc_client_channel_factory_ref(client_channel_factory);
gpr_mu_unlock(&chand->mu);
}