aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_channel/client_channel.c')
-rw-r--r--src/core/ext/client_channel/client_channel.c226
1 files changed, 151 insertions, 75 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 80b4f048c2..9d46338428 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -39,10 +39,12 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -55,7 +57,7 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/metadata_batch.h"
-#include "src/core/lib/transport/method_config.h"
+#include "src/core/lib/transport/service_config.h"
#include "src/core/lib/transport/static_metadata.h"
/* Client channel implementation */
@@ -81,30 +83,61 @@ static void *method_parameters_copy(void *value) {
return new_value;
}
-static int method_parameters_cmp(void *value1, void *value2) {
- const method_parameters *v1 = value1;
- const method_parameters *v2 = value2;
- const int retval = gpr_time_cmp(v1->timeout, v2->timeout);
- if (retval != 0) return retval;
- if (v1->wait_for_ready > v2->wait_for_ready) return 1;
- if (v1->wait_for_ready < v2->wait_for_ready) return -1;
- return 0;
-}
-
static const grpc_mdstr_hash_table_vtable method_parameters_vtable = {
- gpr_free, method_parameters_copy, method_parameters_cmp};
-
-static void *method_config_convert_value(
- const grpc_method_config *method_config) {
+ gpr_free, method_parameters_copy};
+
+static void *method_parameters_create_from_json(const grpc_json *json) {
+ wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
+ gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
+ for (grpc_json *field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) continue;
+ if (strcmp(field->key, "waitForReady") == 0) {
+ if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
+ return NULL;
+ }
+ wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
+ : WAIT_FOR_READY_FALSE;
+ } else if (strcmp(field->key, "timeout") == 0) {
+ if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_STRING) return NULL;
+ size_t len = strlen(field->value);
+ if (field->value[len - 1] != 's') return NULL;
+ char *buf = gpr_strdup(field->value);
+ buf[len - 1] = '\0'; // Remove trailing 's'.
+ char *decimal_point = strchr(buf, '.');
+ if (decimal_point != NULL) {
+ *decimal_point = '\0';
+ timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
+ if (timeout.tv_nsec == -1) {
+ gpr_free(buf);
+ return NULL;
+ }
+ // There should always be exactly 3, 6, or 9 fractional digits.
+ int multiplier = 1;
+ switch (strlen(decimal_point + 1)) {
+ case 9:
+ break;
+ case 6:
+ multiplier *= 1000;
+ break;
+ case 3:
+ multiplier *= 1000000;
+ break;
+ default: // Unsupported number of digits.
+ gpr_free(buf);
+ return NULL;
+ }
+ timeout.tv_nsec *= multiplier;
+ }
+ timeout.tv_sec = gpr_parse_nonnegative_int(buf);
+ if (timeout.tv_sec == -1) return NULL;
+ gpr_free(buf);
+ }
+ }
method_parameters *value = gpr_malloc(sizeof(method_parameters));
- const gpr_timespec *timeout = grpc_method_config_get_timeout(method_config);
- value->timeout = timeout != NULL ? *timeout : gpr_time_0(GPR_TIMESPAN);
- const bool *wait_for_ready =
- grpc_method_config_get_wait_for_ready(method_config);
- value->wait_for_ready =
- wait_for_ready == NULL
- ? WAIT_FOR_READY_UNSET
- : (wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE);
+ value->timeout = timeout;
+ value->wait_for_ready = wait_for_ready;
return value;
}
@@ -123,7 +156,10 @@ typedef struct client_channel_channel_data {
/** mutex protecting all variables below in this data structure */
gpr_mu mu;
/** currently active load balancer */
+ char *lb_policy_name;
grpc_lb_policy *lb_policy;
+ /** service config in JSON form */
+ char *service_config_json;
/** maps method names to method_parameters structs */
grpc_mdstr_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
@@ -223,22 +259,19 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
channel_data *chand = arg;
+ char *lb_policy_name = NULL;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_mdstr_hash_table *method_params_table = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
+ char *service_config_json = NULL;
if (chand->resolver_result != NULL) {
- grpc_lb_policy_args lb_policy_args;
- lb_policy_args.args = chand->resolver_result;
- lb_policy_args.client_channel_factory = chand->client_channel_factory;
-
// Find LB policy name.
- const char *lb_policy_name = NULL;
const grpc_arg *channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
lb_policy_name = channel_arg->value.string;
@@ -247,7 +280,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
// assume that we should use the grpclb policy, regardless of what the
// resolver actually specified.
channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_ADDRESSES);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
@@ -272,7 +305,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == NULL) lb_policy_name = "pick_first";
-
+ // Instantiate LB policy.
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.args = chand->resolver_result;
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
@@ -281,14 +317,25 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
+ // Find service config.
channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_SERVICE_CONFIG);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != NULL) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
- method_params_table = grpc_method_config_table_convert(
- (grpc_method_config_table *)channel_arg->value.pointer.p,
- method_config_convert_value, &method_parameters_vtable);
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ service_config_json = gpr_strdup(channel_arg->value.string);
+ grpc_service_config *service_config =
+ grpc_service_config_create(service_config_json);
+ if (service_config != NULL) {
+ method_params_table = grpc_service_config_create_method_config_table(
+ service_config, method_parameters_create_from_json,
+ &method_parameters_vtable);
+ grpc_service_config_destroy(service_config);
+ }
}
+ // Before we clean up, save a copy of lb_policy_name, since it might
+ // be pointing to data inside chand->resolver_result.
+ // The copy will be saved in chand->lb_policy_name below.
+ lb_policy_name = gpr_strdup(lb_policy_name);
grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = NULL;
}
@@ -299,8 +346,16 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
gpr_mu_lock(&chand->mu);
+ if (lb_policy_name != NULL) {
+ gpr_free(chand->lb_policy_name);
+ chand->lb_policy_name = lb_policy_name;
+ }
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
+ if (service_config_json != NULL) {
+ gpr_free(chand->service_config_json);
+ chand->service_config_json = service_config_json;
+ }
if (chand->method_params_table != NULL) {
grpc_mdstr_hash_table_unref(chand->method_params_table);
}
@@ -426,25 +481,58 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&chand->mu);
}
-/* Constructor for channel_data */
-static void cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_channel_element_args *args) {
+static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ const grpc_channel_info *info) {
channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->mu);
+ if (info->lb_policy_name != NULL) {
+ *info->lb_policy_name = chand->lb_policy_name == NULL
+ ? NULL
+ : gpr_strdup(chand->lb_policy_name);
+ }
+ if (info->service_config_json != NULL) {
+ *info->service_config_json = chand->service_config_json == NULL
+ ? NULL
+ : gpr_strdup(chand->service_config_json);
+ }
+ gpr_mu_unlock(&chand->mu);
+}
+/* Constructor for channel_data */
+static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
-
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
-
+ // Initialize data members.
gpr_mu_init(&chand->mu);
+ chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
on_resolver_result_changed, chand);
- chand->owning_stack = args->channel_stack;
-
+ chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
- chand->interested_parties = grpc_pollset_set_create();
+ // Record client channel factory.
+ const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
+ GRPC_ARG_CLIENT_CHANNEL_FACTORY);
+ GPR_ASSERT(arg != NULL);
+ GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
+ grpc_client_channel_factory_ref(arg->value.pointer.p);
+ chand->client_channel_factory = arg->value.pointer.p;
+ // Instantiate resolver.
+ arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(arg != NULL);
+ GPR_ASSERT(arg->type == GRPC_ARG_STRING);
+ chand->resolver =
+ grpc_resolver_create(exec_ctx, arg->value.string, args->channel_args,
+ chand->interested_parties);
+ if (chand->resolver == NULL) {
+ return GRPC_ERROR_CREATE("resolver creation failed");
+ }
+ return GRPC_ERROR_NONE;
}
/* Destructor for channel_data */
@@ -465,6 +553,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
+ gpr_free(chand->lb_policy_name);
+ gpr_free(chand->service_config_json);
if (chand->method_params_table != NULL) {
grpc_mdstr_hash_table_unref(chand->method_params_table);
}
@@ -609,15 +699,21 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
"Failed to create subchannel", &error, 1));
} else if (GET_CALL(calld) == CANCELLED_CALL) {
/* already cancelled before subchannel became ready */
- fail_locked(exec_ctx, calld,
- GRPC_ERROR_CREATE_REFERENCING(
- "Cancelled before creating subchannel", &error, 1));
+ grpc_error *cancellation_error = GRPC_ERROR_CREATE_REFERENCING(
+ "Cancelled before creating subchannel", &error, 1);
+ /* if due to deadline, attach the deadline exceeded status to the error */
+ if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
+ cancellation_error =
+ grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_DEADLINE_EXCEEDED);
+ }
+ fail_locked(exec_ctx, calld, cancellation_error);
} else {
/* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
- calld->deadline, &subchannel_call);
+ calld->call_start_time, calld->deadline, &subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL;
@@ -735,7 +831,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
}
}
- // TODO(dgq): make this deadline configurable somehow.
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
@@ -870,7 +965,7 @@ retry:
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
- calld->deadline, &subchannel_call);
+ calld->call_start_time, calld->deadline, &subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
@@ -1022,6 +1117,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
gpr_mu_destroy(&calld->mu);
GPR_ASSERT(calld->waiting_ops_count == 0);
+ if (calld->connected_subchannel != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
+ "picked");
+ }
gpr_free(calld->waiting_ops);
gpr_free(and_free_memory);
}
@@ -1048,33 +1147,10 @@ const grpc_channel_filter grpc_client_channel_filter = {
cc_init_channel_elem,
cc_destroy_channel_elem,
cc_get_peer,
+ cc_get_channel_info,
"client-channel",
};
-void grpc_client_channel_finish_initialization(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
- grpc_resolver *resolver,
- grpc_client_channel_factory *client_channel_factory) {
- /* post construction initialization: set the transport setup pointer */
- GPR_ASSERT(client_channel_factory != NULL);
- grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu);
- GPR_ASSERT(!chand->resolver);
- chand->resolver = resolver;
- GRPC_RESOLVER_REF(resolver, "channel");
- if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
- chand->exit_idle_when_lb_policy_arrives) {
- chand->started_resolving = true;
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
- }
- chand->client_channel_factory = client_channel_factory;
- grpc_client_channel_factory_ref(client_channel_factory);
- gpr_mu_unlock(&chand->mu);
-}
-
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;