aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_config
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_config')
-rw-r--r--src/core/ext/client_config/client_channel.c123
-rw-r--r--src/core/ext/client_config/client_channel.h13
-rw-r--r--src/core/ext/client_config/client_config_plugin.c6
-rw-r--r--src/core/ext/client_config/http_connect_handshaker.c275
-rw-r--r--src/core/ext/client_config/http_connect_handshaker.h (renamed from src/core/ext/client_config/subchannel_factory.c)26
-rw-r--r--src/core/ext/client_config/lb_policy.c20
-rw-r--r--src/core/ext/client_config/lb_policy.h70
-rw-r--r--src/core/ext/client_config/lb_policy_factory.c58
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h49
-rw-r--r--src/core/ext/client_config/resolver_factory.h5
-rw-r--r--src/core/ext/client_config/resolver_registry.c49
-rw-r--r--src/core/ext/client_config/resolver_registry.h8
-rw-r--r--src/core/ext/client_config/resolver_result.c129
-rw-r--r--src/core/ext/client_config/resolver_result.h106
-rw-r--r--src/core/ext/client_config/subchannel.c85
-rw-r--r--src/core/ext/client_config/subchannel.h5
-rw-r--r--src/core/ext/client_config/subchannel_factory.h66
-rw-r--r--src/core/ext/client_config/subchannel_index.c7
18 files changed, 768 insertions, 332 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index fed3ae0450..c693c95a03 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -42,9 +42,11 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
@@ -63,6 +65,8 @@ typedef struct client_channel_channel_data {
grpc_resolver *resolver;
/** have we started resolving this channel */
bool started_resolving;
+ /** client channel factory */
+ grpc_client_channel_factory *client_channel_factory;
/** mutex protecting client configuration, including all
variables below in this data structure */
@@ -111,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
- /* check= */ 0);
+ /* check= */ 0, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@@ -173,20 +177,28 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
if (chand->resolver_result != NULL) {
- lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result);
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.server_name =
+ grpc_resolver_result_get_server_name(chand->resolver_result);
+ lb_policy_args.addresses =
+ grpc_resolver_result_get_addresses(chand->resolver_result);
+ lb_policy_args.additional_args =
+ grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy = grpc_lb_policy_create(
+ exec_ctx,
+ grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
+ &lb_policy_args);
if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error);
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
-
grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
+ chand->resolver_result = NULL;
}
- chand->resolver_result = NULL;
-
if (lb_policy != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
chand->interested_parties);
@@ -346,6 +358,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
+ if (chand->client_channel_factory != NULL) {
+ grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
+ }
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
@@ -377,6 +392,17 @@ typedef enum {
for initial metadata before trying to create a call object,
and handling cancellation gracefully. */
typedef struct client_channel_call_data {
+ // State for handling deadlines.
+ // The code in deadline_filter.c requires this to be the first field.
+ // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
+ // and this struct both independently store a pointer to the call
+ // stack and each has its own mutex. If/when we have time, find a way
+ // to avoid this without breaking the grpc_deadline_state abstraction.
+ grpc_deadline_state deadline_state;
+ gpr_timespec deadline;
+
+ grpc_error *cancel_error;
+
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
@@ -387,13 +413,15 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
- grpc_transport_stream_op *waiting_ops;
+ grpc_transport_stream_op **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
grpc_closure next_step;
grpc_call_stack *owning_call;
+
+ grpc_linked_mdelem lb_token_mdelem;
} call_data;
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
@@ -404,7 +432,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
gpr_realloc(calld->waiting_ops,
calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
}
- calld->waiting_ops[calld->waiting_ops_count++] = *op;
+ calld->waiting_ops[calld->waiting_ops_count++] = op;
GPR_TIMER_END("add_waiting_locked", 0);
}
@@ -413,14 +441,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
grpc_transport_stream_op_finish_with_failure(
- exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error));
+ exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
GRPC_ERROR_UNREF(error);
}
typedef struct {
- grpc_transport_stream_op *ops;
+ grpc_transport_stream_op **ops;
size_t nops;
grpc_subchannel_call *call;
} retry_ops_args;
@@ -429,7 +457,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
}
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
gpr_free(a->ops);
@@ -437,6 +465,10 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
}
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
+ if (calld->waiting_ops_count == 0) {
+ return;
+ }
+
retry_ops_args *a = gpr_malloc(sizeof(*a));
a->ops = calld->waiting_ops;
a->nops = calld->waiting_ops_count;
@@ -465,7 +497,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
"Failed to create subchannel", &error, 1));
- } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) {
+ } else if (GET_CALL(calld) == CANCELLED_CALL) {
/* already cancelled before subchannel became ready */
fail_locked(exec_ctx, calld,
GRPC_ERROR_CREATE_REFERENCING(
@@ -473,7 +505,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
} else {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent,
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
@@ -511,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
- grpc_closure *on_ready);
+ grpc_closure *on_ready, grpc_error *error);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
@@ -522,7 +554,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
} else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
- cpa->connected_subchannel, cpa->on_ready)) {
+ cpa->connected_subchannel, cpa->on_ready,
+ GRPC_ERROR_NONE)) {
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
}
gpr_free(cpa);
@@ -532,7 +565,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
- grpc_closure *on_ready) {
+ grpc_closure *on_ready, grpc_error *error) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data;
@@ -546,29 +579,34 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
- connected_subchannel);
+ connected_subchannel, GRPC_ERROR_REF(error));
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
- GRPC_ERROR_CREATE("Pick cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, cpa->on_ready,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
}
}
gpr_mu_unlock(&chand->mu);
GPR_TIMER_END("pick_subchannel", 0);
+ GRPC_ERROR_UNREF(error);
return true;
}
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
if (chand->lb_policy != NULL) {
grpc_lb_policy *lb_policy = chand->lb_policy;
int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
- initial_metadata, initial_metadata_flags,
- connected_subchannel, on_ready);
+ const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
+ initial_metadata_flags,
+ &calld->lb_token_mdelem};
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
+ NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return r;
@@ -610,12 +648,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -631,8 +670,8 @@ retry:
call = GET_CALL(calld);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&calld->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -648,18 +687,24 @@ retry:
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
goto retry;
} else {
+ // Stash a copy of cancel_error in our call data, so that we can use
+ // it for subsequent operations. This ensures that if the call is
+ // cancelled before any ops are passed down (e.g., if the deadline
+ // is in the past when the call starts), we can return the right
+ // error to the caller when the first op does get passed down.
+ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel,
- NULL);
+ NULL, GRPC_ERROR_REF(op->cancel_error));
break;
}
gpr_mu_unlock(&calld->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -673,7 +718,8 @@ retry:
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
- &calld->connected_subchannel, &calld->next_step)) {
+ &calld->connected_subchannel, &calld->next_step,
+ GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
}
@@ -683,7 +729,7 @@ retry:
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent,
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
@@ -706,6 +752,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
+ grpc_deadline_state_init(exec_ctx, elem, args);
+ calld->deadline = args->deadline;
+ calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu);
calld->connected_subchannel = NULL;
@@ -724,6 +773,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info,
void *and_free_memory) {
call_data *calld = elem->call_data;
+ grpc_deadline_state_destroy(exec_ctx, elem);
+ GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
@@ -760,10 +811,12 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
-void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- grpc_resolver *resolver) {
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory) {
/* post construction initialization: set the transport setup pointer */
+ GPR_ASSERT(client_channel_factory != NULL);
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
@@ -778,6 +831,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
// grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
// &chand->on_resolver_result_changed);
// }
+ chand->client_channel_factory = client_channel_factory;
+ grpc_client_channel_factory_ref(client_channel_factory);
gpr_mu_unlock(&chand->mu);
}
diff --git a/src/core/ext/client_config/client_channel.h b/src/core/ext/client_config/client_channel.h
index 1e47ad34ad..abb5ac4d87 100644
--- a/src/core/ext/client_config/client_channel.h
+++ b/src/core/ext/client_config/client_channel.h
@@ -34,6 +34,7 @@
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
+#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -46,12 +47,12 @@
extern const grpc_channel_filter grpc_client_channel_filter;
-/* post-construction initializer to let the client channel know which
- transport setup it should cancel upon destruction, or initiate when it needs
- a connection */
-void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- grpc_resolver *resolver);
+/* Post-construction initializer to give the client channel its resolver
+ and factory. */
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
diff --git a/src/core/ext/client_config/client_config_plugin.c b/src/core/ext/client_config/client_config_plugin.c
index 5e31613420..dc3629d383 100644
--- a/src/core/ext/client_config/client_config_plugin.c
+++ b/src/core/ext/client_config/client_config_plugin.c
@@ -43,10 +43,6 @@
#include "src/core/ext/client_config/subchannel_index.h"
#include "src/core/lib/surface/channel_init.h"
-#ifndef GRPC_DEFAULT_NAME_PREFIX
-#define GRPC_DEFAULT_NAME_PREFIX "dns:///"
-#endif
-
static bool append_filter(grpc_channel_stack_builder *builder, void *arg) {
return grpc_channel_stack_builder_append_filter(
builder, (const grpc_channel_filter *)arg, NULL, NULL);
@@ -79,7 +75,7 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder *builder,
void grpc_client_config_init(void) {
grpc_lb_policy_registry_init();
- grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX);
+ grpc_resolver_registry_init();
grpc_subchannel_index_init();
grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
set_default_host_if_unset, NULL);
diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c
new file mode 100644
index 0000000000..fda1df173e
--- /dev/null
+++ b/src/core/ext/client_config/http_connect_handshaker.c
@@ -0,0 +1,275 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_config/http_connect_handshaker.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_config/uri_parser.h"
+#include "src/core/lib/http/format_request.h"
+#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/support/env.h"
+
+typedef struct http_connect_handshaker {
+ // Base class. Must be first.
+ grpc_handshaker base;
+
+ char* proxy_server;
+ char* server_name;
+
+ // State saved while performing the handshake.
+ grpc_endpoint* endpoint;
+ grpc_channel_args* args;
+ grpc_handshaker_done_cb cb;
+ void* user_data;
+
+ // Objects for processing the HTTP CONNECT request and response.
+ gpr_slice_buffer write_buffer;
+ gpr_slice_buffer* read_buffer; // Ownership passes through this object.
+ grpc_closure request_done_closure;
+ grpc_closure response_read_closure;
+ grpc_http_parser http_parser;
+ grpc_http_response http_response;
+ grpc_timer timeout_timer;
+
+ gpr_refcount refcount;
+} http_connect_handshaker;
+
+// Unref and clean up handshaker.
+static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
+ if (gpr_unref(&handshaker->refcount)) {
+ gpr_free(handshaker->proxy_server);
+ gpr_free(handshaker->server_name);
+ gpr_slice_buffer_destroy(&handshaker->write_buffer);
+ grpc_http_parser_destroy(&handshaker->http_parser);
+ grpc_http_response_destroy(&handshaker->http_response);
+ gpr_free(handshaker);
+ }
+}
+
+// Callback invoked when deadline is exceeded.
+static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
+ grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint);
+ }
+ http_connect_handshaker_unref(handshaker);
+}
+
+// Callback invoked when finished writing HTTP CONNECT request.
+static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error != GRPC_ERROR_NONE) {
+ // If the write failed, invoke the callback immediately with the error.
+ handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
+ handshaker->read_buffer, handshaker->user_data,
+ GRPC_ERROR_REF(error));
+ } else {
+ // Otherwise, read the response.
+ grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ &handshaker->response_read_closure);
+ }
+}
+
+// Callback invoked for reading HTTP CONNECT response.
+static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback.
+ goto done;
+ }
+ // Add buffer to parser.
+ for (size_t i = 0; i < handshaker->read_buffer->count; ++i) {
+ if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) {
+ size_t body_start_offset = 0;
+ error = grpc_http_parser_parse(&handshaker->http_parser,
+ handshaker->read_buffer->slices[i],
+ &body_start_offset);
+ if (error != GRPC_ERROR_NONE) goto done;
+ if (handshaker->http_parser.state == GRPC_HTTP_BODY) {
+ // We've gotten back a successul response, so stop the timeout timer.
+ grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer);
+ // Remove the data we've already read from the read buffer,
+ // leaving only the leftover bytes (if any).
+ gpr_slice_buffer tmp_buffer;
+ gpr_slice_buffer_init(&tmp_buffer);
+ if (body_start_offset <
+ GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) {
+ gpr_slice_buffer_add(
+ &tmp_buffer,
+ gpr_slice_split_tail(&handshaker->read_buffer->slices[i],
+ body_start_offset));
+ }
+ gpr_slice_buffer_addn(&tmp_buffer,
+ &handshaker->read_buffer->slices[i + 1],
+ handshaker->read_buffer->count - i - 1);
+ gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer);
+ gpr_slice_buffer_destroy(&tmp_buffer);
+ break;
+ }
+ }
+ }
+ // If we're not done reading the response, read more data.
+ // TODO(roth): In practice, I suspect that the response to a CONNECT
+ // request will never include a body, in which case this check is
+ // sufficient. However, the language of RFC-2817 doesn't explicitly
+ // forbid the response from including a body. If there is a body,
+ // it's possible that we might have parsed part but not all of the
+ // body, in which case this check will cause us to fail to parse the
+ // remainder of the body. If that ever becomes an issue, we may
+ // need to fix the HTTP parser to understand when the body is
+ // complete (e.g., handling chunked transfer encoding or looking
+ // at the Content-Length: header).
+ if (handshaker->http_parser.state != GRPC_HTTP_BODY) {
+ gpr_slice_buffer_reset_and_unref(handshaker->read_buffer);
+ grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ &handshaker->response_read_closure);
+ return;
+ }
+ // Make sure we got a 2xx response.
+ if (handshaker->http_response.status < 200 ||
+ handshaker->http_response.status >= 300) {
+ char* msg;
+ gpr_asprintf(&msg, "HTTP proxy returned response code %d",
+ handshaker->http_response.status);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ }
+done:
+ // Invoke handshake-done callback.
+ handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
+ handshaker->read_buffer, handshaker->user_data, error);
+}
+
+//
+// Public handshaker methods
+//
+
+static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker_in) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ http_connect_handshaker_unref(handshaker);
+}
+
+static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {}
+
+static void http_connect_handshaker_do_handshake(
+ grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in,
+ grpc_endpoint* endpoint, grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer, gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb,
+ void* user_data) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ // Save state in the handshaker object.
+ handshaker->endpoint = endpoint;
+ handshaker->args = args;
+ handshaker->cb = cb;
+ handshaker->user_data = user_data;
+ handshaker->read_buffer = read_buffer;
+ // Send HTTP CONNECT request.
+ gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
+ handshaker->server_name, handshaker->proxy_server);
+ grpc_httpcli_request request;
+ memset(&request, 0, sizeof(request));
+ request.host = handshaker->proxy_server;
+ request.http.method = "CONNECT";
+ request.http.path = handshaker->server_name;
+ request.handshaker = &grpc_httpcli_plaintext;
+ gpr_slice request_slice = grpc_httpcli_format_connect_request(&request);
+ gpr_slice_buffer_add(&handshaker->write_buffer, request_slice);
+ grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer,
+ &handshaker->request_done_closure);
+ // Set timeout timer. The timer gets a reference to the handshaker.
+ gpr_ref(&handshaker->refcount);
+ grpc_timer_init(exec_ctx, &handshaker->timeout_timer,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC));
+}
+
+static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = {
+ http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
+ http_connect_handshaker_do_handshake};
+
+grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
+ const char* server_name) {
+ GPR_ASSERT(proxy_server != NULL);
+ GPR_ASSERT(server_name != NULL);
+ http_connect_handshaker* handshaker =
+ gpr_malloc(sizeof(http_connect_handshaker));
+ memset(handshaker, 0, sizeof(*handshaker));
+ grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
+ handshaker->proxy_server = gpr_strdup(proxy_server);
+ handshaker->server_name = gpr_strdup(server_name);
+ gpr_slice_buffer_init(&handshaker->write_buffer);
+ grpc_closure_init(&handshaker->request_done_closure, on_write_done,
+ handshaker);
+ grpc_closure_init(&handshaker->response_read_closure, on_read_done,
+ handshaker);
+ grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE,
+ &handshaker->http_response);
+ gpr_ref_init(&handshaker->refcount, 1);
+ return &handshaker->base;
+}
+
+char* grpc_get_http_proxy_server() {
+ char* uri_str = gpr_getenv("http_proxy");
+ if (uri_str == NULL) return NULL;
+ grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */);
+ char* proxy_name = NULL;
+ if (uri == NULL || uri->authority == NULL) {
+ gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var");
+ goto done;
+ }
+ if (strcmp(uri->scheme, "http") != 0) {
+ gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme);
+ goto done;
+ }
+ if (strchr(uri->authority, '@') != NULL) {
+ gpr_log(GPR_ERROR, "userinfo not supported in proxy URI");
+ goto done;
+ }
+ proxy_name = gpr_strdup(uri->authority);
+done:
+ gpr_free(uri_str);
+ grpc_uri_destroy(uri);
+ return proxy_name;
+}
diff --git a/src/core/ext/client_config/subchannel_factory.c b/src/core/ext/client_config/http_connect_handshaker.h
index d1e4d75a02..1fc3948267 100644
--- a/src/core/ext/client_config/subchannel_factory.c
+++ b/src/core/ext/client_config/http_connect_handshaker.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,19 +31,17 @@
*
*/
-#include "src/core/ext/client_config/subchannel_factory.h"
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H
-void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) {
- factory->vtable->ref(factory);
-}
+#include "src/core/lib/channel/handshaker.h"
-void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx,
- grpc_subchannel_factory* factory) {
- factory->vtable->unref(exec_ctx, factory);
-}
+/// Does NOT take ownership of \a proxy_server or \a server_name.
+grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
+ const char* server_name);
-grpc_subchannel* grpc_subchannel_factory_create_subchannel(
- grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory,
- grpc_subchannel_args* args) {
- return factory->vtable->create_subchannel(exec_ctx, factory, args);
-}
+/// Returns the name of the proxy to use, or NULL if no proxy is configured.
+/// Caller takes ownership of result.
+char* grpc_get_http_proxy_server();
+
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H */
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 8b980b2cca..46391272a6 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -100,26 +100,26 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data,
+ on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target) {
- policy->vtable->cancel_pick(exec_ctx, policy, target);
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
+ policy->vtable->cancel_pick(exec_ctx, policy, target, error);
}
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
- initial_metadata_flags_eq);
+ initial_metadata_flags_eq, error);
}
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index a2f5446fc6..7fb3e08cb3 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -53,23 +53,38 @@ struct grpc_lb_policy {
grpc_pollset_set *interested_parties;
};
+/** Extra arguments for an LB pick */
+typedef struct grpc_lb_policy_pick_args {
+ /** Parties interested in the pick's progress */
+ grpc_polling_entity *pollent;
+ /** Initial metadata associated with the picking call. */
+ grpc_metadata_batch *initial_metadata;
+ /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
+ uint32_t initial_metadata_flags;
+ /** Storage for LB token in \a initial_metadata, or NULL if not used */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+} grpc_lb_policy_pick_args;
+
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** implement grpc_lb_policy_pick */
+ /** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target, grpc_closure *on_complete);
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete);
+
+ /** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target);
+ grpc_connected_subchannel **target, grpc_error *error);
+
+ /** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq);
+ uint32_t initial_metadata_flags_eq, grpc_error *error);
+ /** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -83,8 +98,7 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
- state cancels the subscription.
- */
+ state cancels the subscription. */
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
@@ -124,30 +138,40 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
-/** Given initial metadata in \a initial_metadata, find an appropriate
- target for this rpc, and 'return' it by calling \a on_complete after setting
- \a target.
- Picking can be asynchronous. Any IO should be done under \a pollent. */
+/** Find an appropriate target for this call, based on \a pick_args.
+ Picking can be synchronous or asynchronous. In the synchronous case, when a
+ pick is readily available, it'll be returned in \a target and a non-zero
+ value will be returned.
+ In the asynchronous case, zero is returned and \a on_complete will be called
+ once \a target and \a user_data have been set. Any IO should be done under
+ \a pick_args->pollent. The opaque \a user_data output argument corresponds
+ to information that may need be propagated from the LB policy. It may be
+ NULL. Errors are signaled by receiving a NULL \a *target. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete);
+/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+ against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
+/** Cancel picks for \a target.
+ The \a on_complete callback of the pending picks will be invoked with \a
+ *target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target);
+ grpc_connected_subchannel **target,
+ grpc_error *error);
-/** Cancel all pending picks which have:
- (initial_metadata_flags & initial_metadata_flags_mask) ==
- initial_metadata_flags_eq */
+/** Cancel all pending picks for which their \a initial_metadata_flags (as given
+ in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
+ when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq);
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error);
/** Try to enter a READY connectivity state */
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
diff --git a/src/core/ext/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c
index 70e46ef3cf..c17af91a09 100644
--- a/src/core/ext/client_config/lb_policy_factory.c
+++ b/src/core/ext/client_config/lb_policy_factory.c
@@ -31,8 +31,66 @@
*
*/
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
#include "src/core/ext/client_config/lb_policy_factory.h"
+grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) {
+ grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
+ addresses->num_addresses = num_addresses;
+ const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
+ addresses->addresses = gpr_malloc(addresses_size);
+ memset(addresses->addresses, 0, addresses_size);
+ return addresses;
+}
+
+grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses,
+ void* (*user_data_copy)(void*)) {
+ grpc_lb_addresses* new_addresses =
+ grpc_lb_addresses_create(addresses->num_addresses);
+ memcpy(new_addresses->addresses, addresses->addresses,
+ sizeof(grpc_lb_address) * addresses->num_addresses);
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (new_addresses->addresses[i].balancer_name != NULL) {
+ new_addresses->addresses[i].balancer_name =
+ gpr_strdup(new_addresses->addresses[i].balancer_name);
+ }
+ if (user_data_copy != NULL) {
+ new_addresses->addresses[i].user_data =
+ user_data_copy(new_addresses->addresses[i].user_data);
+ }
+ }
+ return new_addresses;
+}
+
+void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
+ void* address, size_t address_len,
+ bool is_balancer, char* balancer_name,
+ void* user_data) {
+ GPR_ASSERT(index < addresses->num_addresses);
+ grpc_lb_address* target = &addresses->addresses[index];
+ memcpy(target->address.addr, address, address_len);
+ target->address.len = address_len;
+ target->is_balancer = is_balancer;
+ target->balancer_name = balancer_name;
+ target->user_data = user_data;
+}
+
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses,
+ void (*user_data_destroy)(void*)) {
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ gpr_free(addresses->addresses[i].balancer_name);
+ if (user_data_destroy != NULL) {
+ user_data_destroy(addresses->addresses[i].user_data);
+ }
+ }
+ gpr_free(addresses->addresses);
+ gpr_free(addresses);
+}
+
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
}
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index da1de3579a..ade55704f2 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -36,9 +36,9 @@
#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/lb_policy.h"
-#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/resolve_address.h"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
@@ -47,9 +47,54 @@ struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable *vtable;
};
+/** A resolved address alongside any LB related information associated with it.
+ * \a user_data, if not NULL, contains opaque data meant to be consumed by the
+ * gRPC LB policy. Note that no all LB policies support \a user_data as input.
+ * Those who don't will simply ignore it and will correspondingly return NULL in
+ * their namesake pick() output argument. */
+typedef struct grpc_lb_address {
+ grpc_resolved_address address;
+ bool is_balancer;
+ char *balancer_name; /* For secure naming. */
+ void *user_data;
+} grpc_lb_address;
+
+typedef struct grpc_lb_addresses {
+ size_t num_addresses;
+ grpc_lb_address *addresses;
+} grpc_lb_addresses;
+
+/** Returns a grpc_addresses struct with enough space for
+ * \a num_addresses addresses. */
+grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses);
+
+/** Creates a copy of \a addresses. If \a user_data_copy is not NULL,
+ * it will be invoked to copy the \a user_data field of each address. */
+grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses,
+ void *(*user_data_copy)(void *));
+
+/** Sets the value of the address at index \a index of \a addresses.
+ * \a address is a socket address of length \a address_len.
+ * Takes ownership of \a balancer_name. */
+void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
+ void *address, size_t address_len,
+ bool is_balancer, char *balancer_name,
+ void *user_data);
+
+/** Destroys \a addresses. If \a user_data_destroy is not NULL, it will
+ * be invoked to destroy the \a user_data field of each address. */
+void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses,
+ void (*user_data_destroy)(void *));
+
+/** Arguments passed to LB policies. */
+/* TODO(roth, ctiller): Consider replacing this struct with
+ grpc_channel_args. See comment in resolver_result.h for details. */
typedef struct grpc_lb_policy_args {
- grpc_resolved_addresses *addresses;
+ const char *server_name;
+ grpc_lb_addresses *addresses;
grpc_client_channel_factory *client_channel_factory;
+ /* Can be used to pass implementation-specific parameters to the LB policy. */
+ grpc_channel_args *additional_args;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/ext/client_config/resolver_factory.h b/src/core/ext/client_config/resolver_factory.h
index f69bf79564..9ec5b9a70e 100644
--- a/src/core/ext/client_config/resolver_factory.h
+++ b/src/core/ext/client_config/resolver_factory.h
@@ -47,10 +47,7 @@ struct grpc_resolver_factory {
const grpc_resolver_factory_vtable *vtable;
};
-typedef struct grpc_resolver_args {
- grpc_uri *uri;
- grpc_client_channel_factory *client_channel_factory;
-} grpc_resolver_args;
+typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args;
struct grpc_resolver_factory_vtable {
void (*ref)(grpc_resolver_factory *factory);
diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index e7a4abd568..bd5c683878 100644
--- a/src/core/ext/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -40,22 +40,20 @@
#include <grpc/support/string_util.h>
#define MAX_RESOLVERS 10
+#define DEFAULT_RESOLVER_PREFIX_MAX_LENGTH 32
static grpc_resolver_factory *g_all_of_the_resolvers[MAX_RESOLVERS];
static int g_number_of_resolvers = 0;
-static char *g_default_resolver_prefix;
+static char g_default_resolver_prefix[DEFAULT_RESOLVER_PREFIX_MAX_LENGTH] =
+ "dns:///";
-void grpc_resolver_registry_init(const char *default_resolver_prefix) {
- g_default_resolver_prefix = gpr_strdup(default_resolver_prefix);
-}
+void grpc_resolver_registry_init() {}
void grpc_resolver_registry_shutdown(void) {
- int i;
- for (i = 0; i < g_number_of_resolvers; i++) {
+ for (int i = 0; i < g_number_of_resolvers; i++) {
grpc_resolver_factory_unref(g_all_of_the_resolvers[i]);
}
- gpr_free(g_default_resolver_prefix);
// FIXME(ctiller): this should live in grpc_resolver_registry_init,
// however that would have the client_config plugin call this AFTER we start
// registering resolvers from third party plugins, and so they'd never show
@@ -65,6 +63,17 @@ void grpc_resolver_registry_shutdown(void) {
g_number_of_resolvers = 0;
}
+void grpc_resolver_registry_set_default_prefix(
+ const char *default_resolver_prefix) {
+ const size_t len = strlen(default_resolver_prefix);
+ GPR_ASSERT(len < DEFAULT_RESOLVER_PREFIX_MAX_LENGTH &&
+ "default resolver prefix too long");
+ GPR_ASSERT(len > 0 && "default resolver prefix can't be empty");
+ // By the previous assert, default_resolver_prefix is safe to be copied with a
+ // plain strcpy.
+ strcpy(g_default_resolver_prefix, default_resolver_prefix);
+}
+
void grpc_register_resolver_type(grpc_resolver_factory *factory) {
int i;
for (i = 0; i < g_number_of_resolvers; i++) {
@@ -108,35 +117,27 @@ static grpc_resolver_factory *resolve_factory(const char *target,
*uri = grpc_uri_parse(target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
- if (g_default_resolver_prefix != NULL) {
- grpc_uri_destroy(*uri);
- gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target);
- *uri = grpc_uri_parse(tmp, 1);
- factory = lookup_factory_by_uri(*uri);
- if (factory == NULL) {
- grpc_uri_destroy(grpc_uri_parse(target, 0));
- grpc_uri_destroy(grpc_uri_parse(tmp, 0));
- gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target,
- tmp);
- }
- gpr_free(tmp);
- } else {
+ grpc_uri_destroy(*uri);
+ gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target);
+ *uri = grpc_uri_parse(tmp, 1);
+ factory = lookup_factory_by_uri(*uri);
+ if (factory == NULL) {
grpc_uri_destroy(grpc_uri_parse(target, 0));
- gpr_log(GPR_ERROR, "don't know how to resolve '%s'", target);
+ grpc_uri_destroy(grpc_uri_parse(tmp, 0));
+ gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp);
}
+ gpr_free(tmp);
}
return factory;
}
-grpc_resolver *grpc_resolver_create(
- const char *target, grpc_client_channel_factory *client_channel_factory) {
+grpc_resolver *grpc_resolver_create(const char *target) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
- args.client_channel_factory = client_channel_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
diff --git a/src/core/ext/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h
index 5ef1383cd3..4c6279b978 100644
--- a/src/core/ext/client_config/resolver_registry.h
+++ b/src/core/ext/client_config/resolver_registry.h
@@ -36,9 +36,12 @@
#include "src/core/ext/client_config/resolver_factory.h"
-void grpc_resolver_registry_init(const char *default_prefix);
+void grpc_resolver_registry_init();
void grpc_resolver_registry_shutdown(void);
+/** Set the default URI prefix to \a default_prefix. */
+void grpc_resolver_registry_set_default_prefix(const char *default_prefix);
+
/** Register a resolver type.
URI's of \a scheme will be resolved with the given resolver.
If \a priority is greater than zero, then the resolver will be eligible
@@ -55,8 +58,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
If a resolver factory was found, use it to instantiate a resolver and
return it.
If a resolver factory was not found, return NULL. */
-grpc_resolver *grpc_resolver_create(
- const char *target, grpc_client_channel_factory *client_channel_factory);
+grpc_resolver *grpc_resolver_create(const char *target);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c
index c6c4166e83..63480d152b 100644
--- a/src/core/ext/client_config/resolver_result.c
+++ b/src/core/ext/client_config/resolver_result.c
@@ -1,75 +1,94 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+//
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
#include "src/core/ext/client_config/resolver_result.h"
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/channel/channel_args.h"
struct grpc_resolver_result {
gpr_refcount refs;
- grpc_lb_policy *lb_policy;
+ char* server_name;
+ grpc_lb_addresses* addresses;
+ char* lb_policy_name;
+ grpc_channel_args* lb_policy_args;
};
-grpc_resolver_result *grpc_resolver_result_create() {
- grpc_resolver_result *c = gpr_malloc(sizeof(*c));
- memset(c, 0, sizeof(*c));
- gpr_ref_init(&c->refs, 1);
- return c;
+grpc_resolver_result* grpc_resolver_result_create(
+ const char* server_name, grpc_lb_addresses* addresses,
+ const char* lb_policy_name, grpc_channel_args* lb_policy_args) {
+ grpc_resolver_result* result = gpr_malloc(sizeof(*result));
+ memset(result, 0, sizeof(*result));
+ gpr_ref_init(&result->refs, 1);
+ result->server_name = gpr_strdup(server_name);
+ result->addresses = addresses;
+ result->lb_policy_name = gpr_strdup(lb_policy_name);
+ result->lb_policy_args = lb_policy_args;
+ return result;
}
-void grpc_resolver_result_ref(grpc_resolver_result *c) { gpr_ref(&c->refs); }
+void grpc_resolver_result_ref(grpc_resolver_result* result) {
+ gpr_ref(&result->refs);
+}
-void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx,
- grpc_resolver_result *c) {
- if (gpr_unref(&c->refs)) {
- if (c->lb_policy != NULL) {
- GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "resolver_result");
- }
- gpr_free(c);
+void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
+ grpc_resolver_result* result) {
+ if (gpr_unref(&result->refs)) {
+ gpr_free(result->server_name);
+ grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */);
+ gpr_free(result->lb_policy_name);
+ grpc_channel_args_destroy(result->lb_policy_args);
+ gpr_free(result);
}
}
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result *c,
- grpc_lb_policy *lb_policy) {
- GPR_ASSERT(c->lb_policy == NULL);
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "resolver_result");
- }
- c->lb_policy = lb_policy;
+const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result) {
+ return result->server_name;
+}
+
+grpc_lb_addresses* grpc_resolver_result_get_addresses(
+ grpc_resolver_result* result) {
+ return result->addresses;
+}
+
+const char* grpc_resolver_result_get_lb_policy_name(
+ grpc_resolver_result* result) {
+ return result->lb_policy_name;
}
-grpc_lb_policy *grpc_resolver_result_get_lb_policy(grpc_resolver_result *c) {
- return c->lb_policy;
+grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
+ grpc_resolver_result* result) {
+ return result->lb_policy_args;
}
diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h
index 402f7dbd7e..414c2e2482 100644
--- a/src/core/ext/client_config/resolver_result.h
+++ b/src/core/ext/client_config/resolver_result.h
@@ -1,52 +1,74 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+//
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
-#include "src/core/ext/client_config/lb_policy.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
+#include "src/core/lib/iomgr/resolve_address.h"
-/** Results reported from a grpc_resolver. */
+// TODO(roth, ctiller): In the long term, we are considering replacing
+// the resolver_result data structure with grpc_channel_args. The idea is
+// that the resolver will return a set of channel args that contains the
+// information that is currently in the resolver_result struct. For
+// example, there will be specific args indicating the set of addresses
+// and the name of the LB policy to instantiate. Note that if we did
+// this, we would probably want to change the data structure of
+// grpc_channel_args such to a hash table or AVL or some other data
+// structure that does not require linear search to find keys.
+
+/// Results reported from a grpc_resolver.
typedef struct grpc_resolver_result grpc_resolver_result;
-grpc_resolver_result *grpc_resolver_result_create();
-void grpc_resolver_result_ref(grpc_resolver_result *client_config);
-void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx,
- grpc_resolver_result *client_config);
+/// Takes ownership of \a addresses and \a lb_policy_args.
+grpc_resolver_result* grpc_resolver_result_create(
+ const char* server_name, grpc_lb_addresses* addresses,
+ const char* lb_policy_name, grpc_channel_args* lb_policy_args);
+void grpc_resolver_result_ref(grpc_resolver_result* result);
+void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
+ grpc_resolver_result* result);
+
+/// Caller does NOT take ownership of result.
+const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result);
+
+/// Caller does NOT take ownership of result.
+grpc_lb_addresses* grpc_resolver_result_get_addresses(
+ grpc_resolver_result* result);
+
+/// Caller does NOT take ownership of result.
+const char* grpc_resolver_result_get_lb_policy_name(
+ grpc_resolver_result* result);
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result *client_config,
- grpc_lb_policy *lb_policy);
-grpc_lb_policy *grpc_resolver_result_get_lb_policy(
- grpc_resolver_result *client_config);
+/// Caller does NOT take ownership of result.
+grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
+ grpc_resolver_result* result);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index df35904b85..0bbaa3e382 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -33,6 +33,7 @@
#include "src/core/ext/client_config/subchannel.h"
+#include <limits.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -219,8 +220,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val,
- old_val + delta, reason);
+ "SUBCHANNEL: %p %12s 0x%08d -> 0x%08d [%s]", c, purpose, (int)old_val,
+ (int)(old_val + delta), reason);
#endif
return old_val;
}
@@ -331,41 +332,40 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
- gpr_backoff_init(&c->backoff_state,
- GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
- GRPC_SUBCHANNEL_RECONNECT_JITTER,
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ int initial_backoff_ms =
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+ int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ bool fixed_reconnect_backoff = false;
if (c->args) {
for (size_t i = 0; i < c->args->num_args; i++) {
if (0 == strcmp(c->args->args[i].key,
"grpc.testing.fixed_reconnect_backoff")) {
GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
- gpr_backoff_init(&c->backoff_state, 1.0, 0.0,
- c->args->args[i].value.integer,
- c->args->args[i].value.integer);
- }
- if (0 ==
- strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
- if (c->args->args[i].type == GRPC_ARG_INTEGER) {
- if (c->args->args[i].value.integer >= 0) {
- gpr_backoff_init(
- &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
- GRPC_SUBCHANNEL_RECONNECT_JITTER,
- GPR_MIN(c->args->args[i].value.integer,
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000),
- c->args->args[i].value.integer);
- } else {
- gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
- " : must be non-negative");
- }
- } else {
- gpr_log(GPR_ERROR,
- GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer");
- }
+ fixed_reconnect_backoff = true;
+ initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ max_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){max_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ initial_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
}
}
}
+ gpr_backoff_init(
+ &c->backoff_state,
+ fixed_reconnect_backoff ? 1.0
+ : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ initial_backoff_ms, max_backoff_ms);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(exec_ctx, key, c);
@@ -504,14 +504,13 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *closure) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.connectivity_state = state;
- op.on_connectivity_state_change = closure;
- op.bind_pollset_set = interested_parties;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
void grpc_connected_subchannel_notify_on_state_change(
@@ -525,12 +524,11 @@ void grpc_connected_subchannel_notify_on_state_change(
void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con,
grpc_closure *closure) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.send_ping = closure;
+ op->send_ping = closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
@@ -635,7 +633,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
c->have_alarm = 1;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
const char *errmsg = grpc_error_string(error);
@@ -704,14 +704,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, grpc_subchannel_call **call) {
+ grpc_polling_entity *pollent, gpr_timespec deadline,
+ grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
- NULL, NULL, callstk);
+ NULL, NULL, deadline, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index ae1d96e640..3330621071 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call);
+ grpc_polling_entity *pollent, gpr_timespec deadline,
+ grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
@@ -162,6 +163,8 @@ struct grpc_subchannel_args {
size_t filter_count;
/** Channel arguments to be supplied to the newly created channel */
const grpc_channel_args *args;
+ /** Server name */
+ const char *server_name;
/** Address to connect to */
struct sockaddr *addr;
size_t addr_len;
diff --git a/src/core/ext/client_config/subchannel_factory.h b/src/core/ext/client_config/subchannel_factory.h
deleted file mode 100644
index 0fb806d081..0000000000
--- a/src/core/ext/client_config/subchannel_factory.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-
-#include "src/core/ext/client_config/subchannel.h"
-#include "src/core/lib/channel/channel_stack.h"
-
-typedef struct grpc_subchannel_factory grpc_subchannel_factory;
-typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable;
-
-/** Constructor for new configured channels.
- Creating decorators around this type is encouraged to adapt behavior. */
-struct grpc_subchannel_factory {
- const grpc_subchannel_factory_vtable *vtable;
-};
-
-struct grpc_subchannel_factory_vtable {
- void (*ref)(grpc_subchannel_factory *factory);
- void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory);
- grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *factory,
- grpc_subchannel_args *args);
-};
-
-void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory);
-void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *factory);
-
-/** Create a new grpc_subchannel */
-grpc_subchannel *grpc_subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
- grpc_subchannel_args *args);
-
-#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
diff --git a/src/core/ext/client_config/subchannel_index.c b/src/core/ext/client_config/subchannel_index.c
index 690cb16b96..673f85b8cb 100644
--- a/src/core/ext/client_config/subchannel_index.c
+++ b/src/core/ext/client_config/subchannel_index.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/tls.h>
#include "src/core/lib/channel/channel_args.h"
@@ -85,6 +86,7 @@ static grpc_subchannel_key *create_key(
} else {
k->args.filters = NULL;
}
+ k->args.server_name = gpr_strdup(args->server_name);
k->args.addr_len = args->addr_len;
k->args.addr = gpr_malloc(args->addr_len);
if (k->args.addr_len > 0) {
@@ -111,6 +113,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
if (c != 0) return c;
c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
+ c = strcmp(a->args.server_name, b->args.server_name);
+ if (c != 0) return c;
if (a->args.addr_len) {
c = memcmp(a->args.addr, b->args.addr, a->args.addr_len);
if (c != 0) return c;
@@ -126,9 +130,10 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *k) {
grpc_connector_unref(exec_ctx, k->connector);
- gpr_free(k->args.addr);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy((grpc_channel_args *)k->args.args);
+ gpr_free((void *)k->args.server_name);
+ gpr_free(k->args.addr);
gpr_free(k);
}