diff options
author | Yuchen Zeng <zyc@google.com> | 2016-10-05 14:00:02 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-10-05 14:00:02 -0700 |
commit | ac8bc42c8f8f835047550e70850e1f5fb2d56f97 (patch) | |
tree | fa9130481fca7857415b84918f7cf3d56de800df /src/core/ext/client_config | |
parent | 92b1825735e1ce03e74fff06d432b2ad2f12a338 (diff) | |
parent | a2e50c5b85b4c0d4d7a5f9624eb40856198ef508 (diff) |
Merge remote-tracking branch 'upstream/master' into call_holder_add_pollent
Diffstat (limited to 'src/core/ext/client_config')
18 files changed, 590 insertions, 217 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index f23e01bc94..a332f47413 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -42,9 +42,11 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/deadline_filter.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -63,6 +65,8 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ bool started_resolving; + /** client channel factory */ + grpc_client_channel_factory *client_channel_factory; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -111,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_cancel_picks( exec_ctx, chand->lb_policy, /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, - /* check= */ 0); + /* check= */ 0, GRPC_ERROR_REF(error)); } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); @@ -173,20 +177,53 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); if (chand->resolver_result != NULL) { - lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result); + grpc_lb_policy_args lb_policy_args; + lb_policy_args.server_name = + grpc_resolver_result_get_server_name(chand->resolver_result); + lb_policy_args.addresses = + grpc_resolver_result_get_addresses(chand->resolver_result); + lb_policy_args.additional_args = + grpc_resolver_result_get_lb_policy_args(chand->resolver_result); + lb_policy_args.client_channel_factory = chand->client_channel_factory; + + // Special case: If all of the addresses are balancer addresses, + // assume that we should use the grpclb policy, regardless of what the + // resolver actually specified. + const char *lb_policy_name = + grpc_resolver_result_get_lb_policy_name(chand->resolver_result); + bool found_backend_address = false; + for (size_t i = 0; i < lb_policy_args.addresses->num_addresses; ++i) { + if (!lb_policy_args.addresses->addresses[i].is_balancer) { + found_backend_address = true; + break; + } + } + if (!found_backend_address) { + if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) { + gpr_log(GPR_INFO, + "resolver requested LB policy %s but provided only balancer " + "addresses, no backend addresses -- forcing use of grpclb LB " + "policy", + (lb_policy_name == NULL ? "(none)" : lb_policy_name)); + } + lb_policy_name = "grpclb"; + } + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (lb_policy_name == NULL) lb_policy_name = "pick_first"; + + lb_policy = + grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } - grpc_resolver_result_unref(exec_ctx, chand->resolver_result); + chand->resolver_result = NULL; } - chand->resolver_result = NULL; - if (lb_policy != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, chand->interested_parties); @@ -346,6 +383,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } + if (chand->client_channel_factory != NULL) { + grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); + } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, @@ -377,6 +417,17 @@ typedef enum { for initial metadata before trying to create a call object, and handling cancellation gracefully. */ typedef struct client_channel_call_data { + // State for handling deadlines. + // The code in deadline_filter.c requires this to be the first field. + // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state + // and this struct both independently store a pointer to the call + // stack and each has its own mutex. If/when we have time, find a way + // to avoid this without breaking the grpc_deadline_state abstraction. + grpc_deadline_state deadline_state; + gpr_timespec deadline; + + grpc_error *cancel_error; + /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; @@ -475,7 +526,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, gpr_atm_no_barrier_store(&calld->subchannel_call, 1); fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( "Failed to create subchannel", &error, 1)); - } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) { + } else if (GET_CALL(calld) == CANCELLED_CALL) { /* already cancelled before subchannel became ready */ fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( @@ -483,7 +534,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, } else { grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); @@ -524,7 +575,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready); + grpc_closure *on_ready, grpc_error *error); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -535,7 +586,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready)) { + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); @@ -545,7 +597,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready) { + grpc_closure *on_ready, grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -559,28 +611,33 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, - connected_subchannel); + connected_subchannel, GRPC_ERROR_REF(error)); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = closure->next_data.next) { cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE("Pick cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); } } gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); + GRPC_ERROR_UNREF(error); return true; } + GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; int r; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); gpr_mu_unlock(&chand->mu); + // TODO(dgq): make this deadline configurable somehow. const grpc_lb_policy_pick_args inputs = { - initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem}; + initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, + gpr_inf_future(GPR_CLOCK_MONOTONIC)}; r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); @@ -624,12 +681,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); /* try to (atomically) get the call */ grpc_subchannel_call *call = GET_CALL(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -645,8 +703,8 @@ retry: call = GET_CALL(calld); if (call == CANCELLED_CALL) { gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -662,18 +720,24 @@ retry: (gpr_atm)(uintptr_t)CANCELLED_CALL)) { goto retry; } else { + // Stash a copy of cancel_error in our call data, so that we can use + // it for subsequent operations. This ensures that if the call is + // cancelled before any ops are passed down (e.g., if the deadline + // is in the past when the call starts), we can return the right + // error to the caller when the first op does get passed down. + calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL); + NULL, GRPC_ERROR_REF(op->cancel_error)); break; } gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -690,7 +754,8 @@ retry: that IO of the lb_policy and resolver could be done under it. */ if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step)) { + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -703,7 +768,7 @@ retry: calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; @@ -726,6 +791,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; + grpc_deadline_state_init(exec_ctx, elem, args); + calld->deadline = args->deadline; + calld->cancel_error = GRPC_ERROR_NONE; gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); calld->connected_subchannel = NULL; @@ -744,6 +812,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_final_info *final_info, void *and_free_memory) { call_data *calld = elem->call_data; + grpc_deadline_state_destroy(exec_ctx, elem); + GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); @@ -780,10 +850,12 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver) { +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory) { /* post construction initialization: set the transport setup pointer */ + GPR_ASSERT(client_channel_factory != NULL); grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu); @@ -797,6 +869,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result, &chand->on_resolver_result_changed); } + chand->client_channel_factory = client_channel_factory; + grpc_client_channel_factory_ref(client_channel_factory); gpr_mu_unlock(&chand->mu); } diff --git a/src/core/ext/client_config/client_channel.h b/src/core/ext/client_config/client_channel.h index 1e47ad34ad..abb5ac4d87 100644 --- a/src/core/ext/client_config/client_channel.h +++ b/src/core/ext/client_config/client_channel.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H #define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H +#include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/resolver.h" #include "src/core/lib/channel/channel_stack.h" @@ -46,12 +47,12 @@ extern const grpc_channel_filter grpc_client_channel_filter; -/* post-construction initializer to let the client channel know which - transport setup it should cancel upon destruction, or initiate when it needs - a connection */ -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver); +/* Post-construction initializer to give the client channel its resolver + and factory. */ +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory); grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); diff --git a/src/core/ext/client_config/client_config_plugin.c b/src/core/ext/client_config/client_config_plugin.c index 5e31613420..dc3629d383 100644 --- a/src/core/ext/client_config/client_config_plugin.c +++ b/src/core/ext/client_config/client_config_plugin.c @@ -43,10 +43,6 @@ #include "src/core/ext/client_config/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" -#ifndef GRPC_DEFAULT_NAME_PREFIX -#define GRPC_DEFAULT_NAME_PREFIX "dns:///" -#endif - static bool append_filter(grpc_channel_stack_builder *builder, void *arg) { return grpc_channel_stack_builder_append_filter( builder, (const grpc_channel_filter *)arg, NULL, NULL); @@ -79,7 +75,7 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder *builder, void grpc_client_config_init(void) { grpc_lb_policy_registry_init(); - grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX); + grpc_resolver_registry_init(); grpc_subchannel_index_init(); grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN, set_default_host_if_unset, NULL); diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c new file mode 100644 index 0000000000..fda1df173e --- /dev/null +++ b/src/core/ext/client_config/http_connect_handshaker.c @@ -0,0 +1,275 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_config/http_connect_handshaker.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/client_config/uri_parser.h" +#include "src/core/lib/http/format_request.h" +#include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/env.h" + +typedef struct http_connect_handshaker { + // Base class. Must be first. + grpc_handshaker base; + + char* proxy_server; + char* server_name; + + // State saved while performing the handshake. + grpc_endpoint* endpoint; + grpc_channel_args* args; + grpc_handshaker_done_cb cb; + void* user_data; + + // Objects for processing the HTTP CONNECT request and response. + gpr_slice_buffer write_buffer; + gpr_slice_buffer* read_buffer; // Ownership passes through this object. + grpc_closure request_done_closure; + grpc_closure response_read_closure; + grpc_http_parser http_parser; + grpc_http_response http_response; + grpc_timer timeout_timer; + + gpr_refcount refcount; +} http_connect_handshaker; + +// Unref and clean up handshaker. +static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { + if (gpr_unref(&handshaker->refcount)) { + gpr_free(handshaker->proxy_server); + gpr_free(handshaker->server_name); + gpr_slice_buffer_destroy(&handshaker->write_buffer); + grpc_http_parser_destroy(&handshaker->http_parser); + grpc_http_response_destroy(&handshaker->http_response); + gpr_free(handshaker); + } +} + +// Callback invoked when deadline is exceeded. +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. + grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint); + } + http_connect_handshaker_unref(handshaker); +} + +// Callback invoked when finished writing HTTP CONNECT request. +static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error != GRPC_ERROR_NONE) { + // If the write failed, invoke the callback immediately with the error. + handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, + handshaker->read_buffer, handshaker->user_data, + GRPC_ERROR_REF(error)); + } else { + // Otherwise, read the response. + grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + &handshaker->response_read_closure); + } +} + +// Callback invoked for reading HTTP CONNECT response. +static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error != GRPC_ERROR_NONE) { + GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. + goto done; + } + // Add buffer to parser. + for (size_t i = 0; i < handshaker->read_buffer->count; ++i) { + if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) { + size_t body_start_offset = 0; + error = grpc_http_parser_parse(&handshaker->http_parser, + handshaker->read_buffer->slices[i], + &body_start_offset); + if (error != GRPC_ERROR_NONE) goto done; + if (handshaker->http_parser.state == GRPC_HTTP_BODY) { + // We've gotten back a successul response, so stop the timeout timer. + grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer); + // Remove the data we've already read from the read buffer, + // leaving only the leftover bytes (if any). + gpr_slice_buffer tmp_buffer; + gpr_slice_buffer_init(&tmp_buffer); + if (body_start_offset < + GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) { + gpr_slice_buffer_add( + &tmp_buffer, + gpr_slice_split_tail(&handshaker->read_buffer->slices[i], + body_start_offset)); + } + gpr_slice_buffer_addn(&tmp_buffer, + &handshaker->read_buffer->slices[i + 1], + handshaker->read_buffer->count - i - 1); + gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer); + gpr_slice_buffer_destroy(&tmp_buffer); + break; + } + } + } + // If we're not done reading the response, read more data. + // TODO(roth): In practice, I suspect that the response to a CONNECT + // request will never include a body, in which case this check is + // sufficient. However, the language of RFC-2817 doesn't explicitly + // forbid the response from including a body. If there is a body, + // it's possible that we might have parsed part but not all of the + // body, in which case this check will cause us to fail to parse the + // remainder of the body. If that ever becomes an issue, we may + // need to fix the HTTP parser to understand when the body is + // complete (e.g., handling chunked transfer encoding or looking + // at the Content-Length: header). + if (handshaker->http_parser.state != GRPC_HTTP_BODY) { + gpr_slice_buffer_reset_and_unref(handshaker->read_buffer); + grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + &handshaker->response_read_closure); + return; + } + // Make sure we got a 2xx response. + if (handshaker->http_response.status < 200 || + handshaker->http_response.status >= 300) { + char* msg; + gpr_asprintf(&msg, "HTTP proxy returned response code %d", + handshaker->http_response.status); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + } +done: + // Invoke handshake-done callback. + handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, + handshaker->read_buffer, handshaker->user_data, error); +} + +// +// Public handshaker methods +// + +static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker_in) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + http_connect_handshaker_unref(handshaker); +} + +static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) {} + +static void http_connect_handshaker_do_handshake( + grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in, + grpc_endpoint* endpoint, grpc_channel_args* args, + gpr_slice_buffer* read_buffer, gpr_timespec deadline, + grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, + void* user_data) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + // Save state in the handshaker object. + handshaker->endpoint = endpoint; + handshaker->args = args; + handshaker->cb = cb; + handshaker->user_data = user_data; + handshaker->read_buffer = read_buffer; + // Send HTTP CONNECT request. + gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", + handshaker->server_name, handshaker->proxy_server); + grpc_httpcli_request request; + memset(&request, 0, sizeof(request)); + request.host = handshaker->proxy_server; + request.http.method = "CONNECT"; + request.http.path = handshaker->server_name; + request.handshaker = &grpc_httpcli_plaintext; + gpr_slice request_slice = grpc_httpcli_format_connect_request(&request); + gpr_slice_buffer_add(&handshaker->write_buffer, request_slice); + grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer, + &handshaker->request_done_closure); + // Set timeout timer. The timer gets a reference to the handshaker. + gpr_ref(&handshaker->refcount); + grpc_timer_init(exec_ctx, &handshaker->timeout_timer, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC)); +} + +static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = { + http_connect_handshaker_destroy, http_connect_handshaker_shutdown, + http_connect_handshaker_do_handshake}; + +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + const char* server_name) { + GPR_ASSERT(proxy_server != NULL); + GPR_ASSERT(server_name != NULL); + http_connect_handshaker* handshaker = + gpr_malloc(sizeof(http_connect_handshaker)); + memset(handshaker, 0, sizeof(*handshaker)); + grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); + handshaker->proxy_server = gpr_strdup(proxy_server); + handshaker->server_name = gpr_strdup(server_name); + gpr_slice_buffer_init(&handshaker->write_buffer); + grpc_closure_init(&handshaker->request_done_closure, on_write_done, + handshaker); + grpc_closure_init(&handshaker->response_read_closure, on_read_done, + handshaker); + grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, + &handshaker->http_response); + gpr_ref_init(&handshaker->refcount, 1); + return &handshaker->base; +} + +char* grpc_get_http_proxy_server() { + char* uri_str = gpr_getenv("http_proxy"); + if (uri_str == NULL) return NULL; + grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */); + char* proxy_name = NULL; + if (uri == NULL || uri->authority == NULL) { + gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var"); + goto done; + } + if (strcmp(uri->scheme, "http") != 0) { + gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme); + goto done; + } + if (strchr(uri->authority, '@') != NULL) { + gpr_log(GPR_ERROR, "userinfo not supported in proxy URI"); + goto done; + } + proxy_name = gpr_strdup(uri->authority); +done: + gpr_free(uri_str); + grpc_uri_destroy(uri); + return proxy_name; +} diff --git a/src/core/ext/client_config/subchannel_factory.c b/src/core/ext/client_config/http_connect_handshaker.h index d1e4d75a02..1fc3948267 100644 --- a/src/core/ext/client_config/subchannel_factory.c +++ b/src/core/ext/client_config/http_connect_handshaker.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,19 +31,17 @@ * */ -#include "src/core/ext/client_config/subchannel_factory.h" +#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H +#define GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H -void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) { - factory->vtable->ref(factory); -} +#include "src/core/lib/channel/handshaker.h" -void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx, - grpc_subchannel_factory* factory) { - factory->vtable->unref(exec_ctx, factory); -} +/// Does NOT take ownership of \a proxy_server or \a server_name. +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + const char* server_name); -grpc_subchannel* grpc_subchannel_factory_create_subchannel( - grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory, - grpc_subchannel_args* args) { - return factory->vtable->create_subchannel(exec_ctx, factory, args); -} +/// Returns the name of the proxy to use, or NULL if no proxy is configured. +/// Caller takes ownership of result. +char* grpc_get_http_proxy_server(); + +#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H */ diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c index 903563ef6b..46391272a6 100644 --- a/src/core/ext/client_config/lb_policy.c +++ b/src/core/ext/client_config/lb_policy.c @@ -108,16 +108,18 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target) { - policy->vtable->cancel_pick(exec_ctx, policy, target); + grpc_connected_subchannel **target, + grpc_error *error) { + policy->vtable->cancel_pick(exec_ctx, policy, target, error); } void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask, - initial_metadata_flags_eq); + initial_metadata_flags_eq, error); } void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index ac387e4b80..ef228d0ebc 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -56,10 +56,14 @@ struct grpc_lb_policy { typedef struct grpc_lb_policy_pick_args { /** Initial metadata associated with the picking call. */ grpc_metadata_batch *initial_metadata; - /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */ + /** Bitmask used for selective cancelling. See \a + * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in + * grpc_types.h */ uint32_t initial_metadata_flags; /** Storage for LB token in \a initial_metadata, or NULL if not used */ grpc_linked_mdelem *lb_token_mdelem_storage; + /** Deadline for the call to the LB server */ + gpr_timespec deadline; } grpc_lb_policy_pick_args; struct grpc_lb_policy_vtable { @@ -74,12 +78,12 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_cancel_pick */ void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target); + grpc_connected_subchannel **target, grpc_error *error); /** \see grpc_lb_policy_cancel_picks */ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq); + uint32_t initial_metadata_flags_eq, grpc_error *error); /** \see grpc_lb_policy_ping_one */ void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, @@ -159,7 +163,8 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, The \a on_complete callback of the pending picks will be invoked with \a *target set to NULL. */ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target); + grpc_connected_subchannel **target, + grpc_error *error); /** Cancel all pending picks for which their \a initial_metadata_flags (as given in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq @@ -167,7 +172,8 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq); + uint32_t initial_metadata_flags_eq, + grpc_error *error); /** Try to enter a READY connectivity state */ void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); diff --git a/src/core/ext/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c index f86117aa38..c17af91a09 100644 --- a/src/core/ext/client_config/lb_policy_factory.c +++ b/src/core/ext/client_config/lb_policy_factory.c @@ -34,6 +34,7 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include "src/core/ext/client_config/lb_policy_factory.h" @@ -46,6 +47,25 @@ grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) { return addresses; } +grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses, + void* (*user_data_copy)(void*)) { + grpc_lb_addresses* new_addresses = + grpc_lb_addresses_create(addresses->num_addresses); + memcpy(new_addresses->addresses, addresses->addresses, + sizeof(grpc_lb_address) * addresses->num_addresses); + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (new_addresses->addresses[i].balancer_name != NULL) { + new_addresses->addresses[i].balancer_name = + gpr_strdup(new_addresses->addresses[i].balancer_name); + } + if (user_data_copy != NULL) { + new_addresses->addresses[i].user_data = + user_data_copy(new_addresses->addresses[i].user_data); + } + } + return new_addresses; +} + void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, void* address, size_t address_len, bool is_balancer, char* balancer_name, diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index b31179cf80..ade55704f2 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -36,9 +36,9 @@ #include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/lb_policy.h" -#include "src/core/ext/client_config/resolver_result.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/resolve_address.h" typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; @@ -68,6 +68,11 @@ typedef struct grpc_lb_addresses { * \a num_addresses addresses. */ grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses); +/** Creates a copy of \a addresses. If \a user_data_copy is not NULL, + * it will be invoked to copy the \a user_data field of each address. */ +grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses, + void *(*user_data_copy)(void *)); + /** Sets the value of the address at index \a index of \a addresses. * \a address is a socket address of length \a address_len. * Takes ownership of \a balancer_name. */ @@ -82,9 +87,14 @@ void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses, void (*user_data_destroy)(void *)); /** Arguments passed to LB policies. */ +/* TODO(roth, ctiller): Consider replacing this struct with + grpc_channel_args. See comment in resolver_result.h for details. */ typedef struct grpc_lb_policy_args { + const char *server_name; grpc_lb_addresses *addresses; grpc_client_channel_factory *client_channel_factory; + /* Can be used to pass implementation-specific parameters to the LB policy. */ + grpc_channel_args *additional_args; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { diff --git a/src/core/ext/client_config/resolver_factory.h b/src/core/ext/client_config/resolver_factory.h index f69bf79564..9ec5b9a70e 100644 --- a/src/core/ext/client_config/resolver_factory.h +++ b/src/core/ext/client_config/resolver_factory.h @@ -47,10 +47,7 @@ struct grpc_resolver_factory { const grpc_resolver_factory_vtable *vtable; }; -typedef struct grpc_resolver_args { - grpc_uri *uri; - grpc_client_channel_factory *client_channel_factory; -} grpc_resolver_args; +typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args; struct grpc_resolver_factory_vtable { void (*ref)(grpc_resolver_factory *factory); diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c index e7a4abd568..bd5c683878 100644 --- a/src/core/ext/client_config/resolver_registry.c +++ b/src/core/ext/client_config/resolver_registry.c @@ -40,22 +40,20 @@ #include <grpc/support/string_util.h> #define MAX_RESOLVERS 10 +#define DEFAULT_RESOLVER_PREFIX_MAX_LENGTH 32 static grpc_resolver_factory *g_all_of_the_resolvers[MAX_RESOLVERS]; static int g_number_of_resolvers = 0; -static char *g_default_resolver_prefix; +static char g_default_resolver_prefix[DEFAULT_RESOLVER_PREFIX_MAX_LENGTH] = + "dns:///"; -void grpc_resolver_registry_init(const char *default_resolver_prefix) { - g_default_resolver_prefix = gpr_strdup(default_resolver_prefix); -} +void grpc_resolver_registry_init() {} void grpc_resolver_registry_shutdown(void) { - int i; - for (i = 0; i < g_number_of_resolvers; i++) { + for (int i = 0; i < g_number_of_resolvers; i++) { grpc_resolver_factory_unref(g_all_of_the_resolvers[i]); } - gpr_free(g_default_resolver_prefix); // FIXME(ctiller): this should live in grpc_resolver_registry_init, // however that would have the client_config plugin call this AFTER we start // registering resolvers from third party plugins, and so they'd never show @@ -65,6 +63,17 @@ void grpc_resolver_registry_shutdown(void) { g_number_of_resolvers = 0; } +void grpc_resolver_registry_set_default_prefix( + const char *default_resolver_prefix) { + const size_t len = strlen(default_resolver_prefix); + GPR_ASSERT(len < DEFAULT_RESOLVER_PREFIX_MAX_LENGTH && + "default resolver prefix too long"); + GPR_ASSERT(len > 0 && "default resolver prefix can't be empty"); + // By the previous assert, default_resolver_prefix is safe to be copied with a + // plain strcpy. + strcpy(g_default_resolver_prefix, default_resolver_prefix); +} + void grpc_register_resolver_type(grpc_resolver_factory *factory) { int i; for (i = 0; i < g_number_of_resolvers; i++) { @@ -108,35 +117,27 @@ static grpc_resolver_factory *resolve_factory(const char *target, *uri = grpc_uri_parse(target, 1); factory = lookup_factory_by_uri(*uri); if (factory == NULL) { - if (g_default_resolver_prefix != NULL) { - grpc_uri_destroy(*uri); - gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target); - *uri = grpc_uri_parse(tmp, 1); - factory = lookup_factory_by_uri(*uri); - if (factory == NULL) { - grpc_uri_destroy(grpc_uri_parse(target, 0)); - grpc_uri_destroy(grpc_uri_parse(tmp, 0)); - gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, - tmp); - } - gpr_free(tmp); - } else { + grpc_uri_destroy(*uri); + gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target); + *uri = grpc_uri_parse(tmp, 1); + factory = lookup_factory_by_uri(*uri); + if (factory == NULL) { grpc_uri_destroy(grpc_uri_parse(target, 0)); - gpr_log(GPR_ERROR, "don't know how to resolve '%s'", target); + grpc_uri_destroy(grpc_uri_parse(tmp, 0)); + gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp); } + gpr_free(tmp); } return factory; } -grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory) { +grpc_resolver *grpc_resolver_create(const char *target) { grpc_uri *uri = NULL; grpc_resolver_factory *factory = resolve_factory(target, &uri); grpc_resolver *resolver; grpc_resolver_args args; memset(&args, 0, sizeof(args)); args.uri = uri; - args.client_channel_factory = client_channel_factory; resolver = grpc_resolver_factory_create_resolver(factory, &args); grpc_uri_destroy(uri); return resolver; diff --git a/src/core/ext/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h index 5ef1383cd3..4c6279b978 100644 --- a/src/core/ext/client_config/resolver_registry.h +++ b/src/core/ext/client_config/resolver_registry.h @@ -36,9 +36,12 @@ #include "src/core/ext/client_config/resolver_factory.h" -void grpc_resolver_registry_init(const char *default_prefix); +void grpc_resolver_registry_init(); void grpc_resolver_registry_shutdown(void); +/** Set the default URI prefix to \a default_prefix. */ +void grpc_resolver_registry_set_default_prefix(const char *default_prefix); + /** Register a resolver type. URI's of \a scheme will be resolved with the given resolver. If \a priority is greater than zero, then the resolver will be eligible @@ -55,8 +58,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ -grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory); +grpc_resolver *grpc_resolver_create(const char *target); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c index 242989a0da..63480d152b 100644 --- a/src/core/ext/client_config/resolver_result.c +++ b/src/core/ext/client_config/resolver_result.c @@ -34,40 +34,61 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/channel/channel_args.h" struct grpc_resolver_result { gpr_refcount refs; - grpc_lb_policy* lb_policy; + char* server_name; + grpc_lb_addresses* addresses; + char* lb_policy_name; + grpc_channel_args* lb_policy_args; }; -grpc_resolver_result* grpc_resolver_result_create() { - grpc_resolver_result* c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - gpr_ref_init(&c->refs, 1); - return c; +grpc_resolver_result* grpc_resolver_result_create( + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args) { + grpc_resolver_result* result = gpr_malloc(sizeof(*result)); + memset(result, 0, sizeof(*result)); + gpr_ref_init(&result->refs, 1); + result->server_name = gpr_strdup(server_name); + result->addresses = addresses; + result->lb_policy_name = gpr_strdup(lb_policy_name); + result->lb_policy_args = lb_policy_args; + return result; } -void grpc_resolver_result_ref(grpc_resolver_result* c) { gpr_ref(&c->refs); } +void grpc_resolver_result_ref(grpc_resolver_result* result) { + gpr_ref(&result->refs); +} void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, - grpc_resolver_result* c) { - if (gpr_unref(&c->refs)) { - if (c->lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "resolver_result"); - } - gpr_free(c); + grpc_resolver_result* result) { + if (gpr_unref(&result->refs)) { + gpr_free(result->server_name); + grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */); + gpr_free(result->lb_policy_name); + grpc_channel_args_destroy(result->lb_policy_args); + gpr_free(result); } } -void grpc_resolver_result_set_lb_policy(grpc_resolver_result* c, - grpc_lb_policy* lb_policy) { - GPR_ASSERT(c->lb_policy == NULL); - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "resolver_result"); - } - c->lb_policy = lb_policy; +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result) { + return result->server_name; +} + +grpc_lb_addresses* grpc_resolver_result_get_addresses( + grpc_resolver_result* result) { + return result->addresses; +} + +const char* grpc_resolver_result_get_lb_policy_name( + grpc_resolver_result* result) { + return result->lb_policy_name; } -grpc_lb_policy* grpc_resolver_result_get_lb_policy(grpc_resolver_result* c) { - return c->lb_policy; +grpc_channel_args* grpc_resolver_result_get_lb_policy_args( + grpc_resolver_result* result) { + return result->lb_policy_args; } diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h index 5a69d81990..414c2e2482 100644 --- a/src/core/ext/client_config/resolver_result.h +++ b/src/core/ext/client_config/resolver_result.h @@ -32,22 +32,43 @@ #ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H #define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H -#include <stdbool.h> - -#include "src/core/ext/client_config/lb_policy.h" +#include "src/core/ext/client_config/lb_policy_factory.h" #include "src/core/lib/iomgr/resolve_address.h" +// TODO(roth, ctiller): In the long term, we are considering replacing +// the resolver_result data structure with grpc_channel_args. The idea is +// that the resolver will return a set of channel args that contains the +// information that is currently in the resolver_result struct. For +// example, there will be specific args indicating the set of addresses +// and the name of the LB policy to instantiate. Note that if we did +// this, we would probably want to change the data structure of +// grpc_channel_args such to a hash table or AVL or some other data +// structure that does not require linear search to find keys. + /// Results reported from a grpc_resolver. typedef struct grpc_resolver_result grpc_resolver_result; -grpc_resolver_result* grpc_resolver_result_create(); +/// Takes ownership of \a addresses and \a lb_policy_args. +grpc_resolver_result* grpc_resolver_result_create( + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args); void grpc_resolver_result_ref(grpc_resolver_result* result); void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, grpc_resolver_result* result); -void grpc_resolver_result_set_lb_policy(grpc_resolver_result* result, - grpc_lb_policy* lb_policy); -grpc_lb_policy* grpc_resolver_result_get_lb_policy( +/// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +grpc_lb_addresses* grpc_resolver_result_get_addresses( + grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_lb_policy_name( + grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_resolver_result* result); #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */ diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 1141844634..0bbaa3e382 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -220,8 +220,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, - old_val + delta, reason); + "SUBCHANNEL: %p %12s 0x%08d -> 0x%08d [%s]", c, purpose, (int)old_val, + (int)(old_val + delta), reason); #endif return old_val; } @@ -332,36 +332,40 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - gpr_backoff_init(&c->backoff_state, - GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_SUBCHANNEL_RECONNECT_JITTER, - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + int initial_backoff_ms = + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; + int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + bool fixed_reconnect_backoff = false; if (c->args) { for (size_t i = 0; i < c->args->num_args; i++) { if (0 == strcmp(c->args->args[i].key, "grpc.testing.fixed_reconnect_backoff")) { GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); - gpr_backoff_init(&c->backoff_state, 1.0, 0.0, - c->args->args[i].value.integer, - c->args->args[i].value.integer); - } - if (0 == - strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&c->args->args[i], options); - if (value >= 0) { - gpr_backoff_init( - &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_SUBCHANNEL_RECONNECT_JITTER, - GPR_MIN(value, - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000), - value); - } + fixed_reconnect_backoff = true; + initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); + } else if (0 == strcmp(c->args->args[i].key, + GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + max_backoff_ms = grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){max_backoff_ms, 100, INT_MAX}); + } else if (0 == strcmp(c->args->args[i].key, + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + initial_backoff_ms = grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); } } } + gpr_backoff_init( + &c->backoff_state, + fixed_reconnect_backoff ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, + fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, + initial_backoff_ms, max_backoff_ms); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -629,7 +633,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, c->have_alarm = 1; grpc_connectivity_state_set( exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), + grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); const char *errmsg = grpc_error_string(error); @@ -698,14 +704,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_subchannel_call **call) { + grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. grpc_error *error = grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, callstk); + NULL, NULL, deadline, callstk); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h index ae1d96e640..3330621071 100644 --- a/src/core/ext/client_config/subchannel.h +++ b/src/core/ext/client_config/subchannel.h @@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call */ grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call); + grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( @@ -162,6 +163,8 @@ struct grpc_subchannel_args { size_t filter_count; /** Channel arguments to be supplied to the newly created channel */ const grpc_channel_args *args; + /** Server name */ + const char *server_name; /** Address to connect to */ struct sockaddr *addr; size_t addr_len; diff --git a/src/core/ext/client_config/subchannel_factory.h b/src/core/ext/client_config/subchannel_factory.h deleted file mode 100644 index 0fb806d081..0000000000 --- a/src/core/ext/client_config/subchannel_factory.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H - -#include "src/core/ext/client_config/subchannel.h" -#include "src/core/lib/channel/channel_stack.h" - -typedef struct grpc_subchannel_factory grpc_subchannel_factory; -typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable; - -/** Constructor for new configured channels. - Creating decorators around this type is encouraged to adapt behavior. */ -struct grpc_subchannel_factory { - const grpc_subchannel_factory_vtable *vtable; -}; - -struct grpc_subchannel_factory_vtable { - void (*ref)(grpc_subchannel_factory *factory); - void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory); - grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx, - grpc_subchannel_factory *factory, - grpc_subchannel_args *args); -}; - -void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory); -void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel_factory *factory); - -/** Create a new grpc_subchannel */ -grpc_subchannel *grpc_subchannel_factory_create_subchannel( - grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory, - grpc_subchannel_args *args); - -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */ diff --git a/src/core/ext/client_config/subchannel_index.c b/src/core/ext/client_config/subchannel_index.c index 690cb16b96..673f85b8cb 100644 --- a/src/core/ext/client_config/subchannel_index.c +++ b/src/core/ext/client_config/subchannel_index.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/avl.h> +#include <grpc/support/string_util.h> #include <grpc/support/tls.h> #include "src/core/lib/channel/channel_args.h" @@ -85,6 +86,7 @@ static grpc_subchannel_key *create_key( } else { k->args.filters = NULL; } + k->args.server_name = gpr_strdup(args->server_name); k->args.addr_len = args->addr_len; k->args.addr = gpr_malloc(args->addr_len); if (k->args.addr_len > 0) { @@ -111,6 +113,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a, if (c != 0) return c; c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; + c = strcmp(a->args.server_name, b->args.server_name); + if (c != 0) return c; if (a->args.addr_len) { c = memcmp(a->args.addr, b->args.addr, a->args.addr_len); if (c != 0) return c; @@ -126,9 +130,10 @@ static int subchannel_key_compare(grpc_subchannel_key *a, void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *k) { grpc_connector_unref(exec_ctx, k->connector); - gpr_free(k->args.addr); gpr_free((grpc_channel_args *)k->args.filters); grpc_channel_args_destroy((grpc_channel_args *)k->args.args); + gpr_free((void *)k->args.server_name); + gpr_free(k->args.addr); gpr_free(k); } |