diff options
Diffstat (limited to 'src/core')
25 files changed, 445 insertions, 54 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 63797bfa1e..b2b4fea83c 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -177,6 +177,8 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, if (chand->resolver_result != NULL) { grpc_lb_policy_args lb_policy_args; + lb_policy_args.server_name = + grpc_resolver_result_get_server_name(chand->resolver_result); lb_policy_args.addresses = grpc_resolver_result_get_addresses(chand->resolver_result); lb_policy_args.additional_args = diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c new file mode 100644 index 0000000000..fda1df173e --- /dev/null +++ b/src/core/ext/client_config/http_connect_handshaker.c @@ -0,0 +1,275 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_config/http_connect_handshaker.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/client_config/uri_parser.h" +#include "src/core/lib/http/format_request.h" +#include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/env.h" + +typedef struct http_connect_handshaker { + // Base class. Must be first. + grpc_handshaker base; + + char* proxy_server; + char* server_name; + + // State saved while performing the handshake. + grpc_endpoint* endpoint; + grpc_channel_args* args; + grpc_handshaker_done_cb cb; + void* user_data; + + // Objects for processing the HTTP CONNECT request and response. + gpr_slice_buffer write_buffer; + gpr_slice_buffer* read_buffer; // Ownership passes through this object. + grpc_closure request_done_closure; + grpc_closure response_read_closure; + grpc_http_parser http_parser; + grpc_http_response http_response; + grpc_timer timeout_timer; + + gpr_refcount refcount; +} http_connect_handshaker; + +// Unref and clean up handshaker. +static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { + if (gpr_unref(&handshaker->refcount)) { + gpr_free(handshaker->proxy_server); + gpr_free(handshaker->server_name); + gpr_slice_buffer_destroy(&handshaker->write_buffer); + grpc_http_parser_destroy(&handshaker->http_parser); + grpc_http_response_destroy(&handshaker->http_response); + gpr_free(handshaker); + } +} + +// Callback invoked when deadline is exceeded. +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. + grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint); + } + http_connect_handshaker_unref(handshaker); +} + +// Callback invoked when finished writing HTTP CONNECT request. +static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error != GRPC_ERROR_NONE) { + // If the write failed, invoke the callback immediately with the error. + handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, + handshaker->read_buffer, handshaker->user_data, + GRPC_ERROR_REF(error)); + } else { + // Otherwise, read the response. + grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + &handshaker->response_read_closure); + } +} + +// Callback invoked for reading HTTP CONNECT response. +static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error != GRPC_ERROR_NONE) { + GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. + goto done; + } + // Add buffer to parser. + for (size_t i = 0; i < handshaker->read_buffer->count; ++i) { + if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) { + size_t body_start_offset = 0; + error = grpc_http_parser_parse(&handshaker->http_parser, + handshaker->read_buffer->slices[i], + &body_start_offset); + if (error != GRPC_ERROR_NONE) goto done; + if (handshaker->http_parser.state == GRPC_HTTP_BODY) { + // We've gotten back a successul response, so stop the timeout timer. + grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer); + // Remove the data we've already read from the read buffer, + // leaving only the leftover bytes (if any). + gpr_slice_buffer tmp_buffer; + gpr_slice_buffer_init(&tmp_buffer); + if (body_start_offset < + GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) { + gpr_slice_buffer_add( + &tmp_buffer, + gpr_slice_split_tail(&handshaker->read_buffer->slices[i], + body_start_offset)); + } + gpr_slice_buffer_addn(&tmp_buffer, + &handshaker->read_buffer->slices[i + 1], + handshaker->read_buffer->count - i - 1); + gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer); + gpr_slice_buffer_destroy(&tmp_buffer); + break; + } + } + } + // If we're not done reading the response, read more data. + // TODO(roth): In practice, I suspect that the response to a CONNECT + // request will never include a body, in which case this check is + // sufficient. However, the language of RFC-2817 doesn't explicitly + // forbid the response from including a body. If there is a body, + // it's possible that we might have parsed part but not all of the + // body, in which case this check will cause us to fail to parse the + // remainder of the body. If that ever becomes an issue, we may + // need to fix the HTTP parser to understand when the body is + // complete (e.g., handling chunked transfer encoding or looking + // at the Content-Length: header). + if (handshaker->http_parser.state != GRPC_HTTP_BODY) { + gpr_slice_buffer_reset_and_unref(handshaker->read_buffer); + grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + &handshaker->response_read_closure); + return; + } + // Make sure we got a 2xx response. + if (handshaker->http_response.status < 200 || + handshaker->http_response.status >= 300) { + char* msg; + gpr_asprintf(&msg, "HTTP proxy returned response code %d", + handshaker->http_response.status); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + } +done: + // Invoke handshake-done callback. + handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, + handshaker->read_buffer, handshaker->user_data, error); +} + +// +// Public handshaker methods +// + +static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker_in) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + http_connect_handshaker_unref(handshaker); +} + +static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) {} + +static void http_connect_handshaker_do_handshake( + grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in, + grpc_endpoint* endpoint, grpc_channel_args* args, + gpr_slice_buffer* read_buffer, gpr_timespec deadline, + grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, + void* user_data) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + // Save state in the handshaker object. + handshaker->endpoint = endpoint; + handshaker->args = args; + handshaker->cb = cb; + handshaker->user_data = user_data; + handshaker->read_buffer = read_buffer; + // Send HTTP CONNECT request. + gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", + handshaker->server_name, handshaker->proxy_server); + grpc_httpcli_request request; + memset(&request, 0, sizeof(request)); + request.host = handshaker->proxy_server; + request.http.method = "CONNECT"; + request.http.path = handshaker->server_name; + request.handshaker = &grpc_httpcli_plaintext; + gpr_slice request_slice = grpc_httpcli_format_connect_request(&request); + gpr_slice_buffer_add(&handshaker->write_buffer, request_slice); + grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer, + &handshaker->request_done_closure); + // Set timeout timer. The timer gets a reference to the handshaker. + gpr_ref(&handshaker->refcount); + grpc_timer_init(exec_ctx, &handshaker->timeout_timer, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC)); +} + +static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = { + http_connect_handshaker_destroy, http_connect_handshaker_shutdown, + http_connect_handshaker_do_handshake}; + +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + const char* server_name) { + GPR_ASSERT(proxy_server != NULL); + GPR_ASSERT(server_name != NULL); + http_connect_handshaker* handshaker = + gpr_malloc(sizeof(http_connect_handshaker)); + memset(handshaker, 0, sizeof(*handshaker)); + grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); + handshaker->proxy_server = gpr_strdup(proxy_server); + handshaker->server_name = gpr_strdup(server_name); + gpr_slice_buffer_init(&handshaker->write_buffer); + grpc_closure_init(&handshaker->request_done_closure, on_write_done, + handshaker); + grpc_closure_init(&handshaker->response_read_closure, on_read_done, + handshaker); + grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, + &handshaker->http_response); + gpr_ref_init(&handshaker->refcount, 1); + return &handshaker->base; +} + +char* grpc_get_http_proxy_server() { + char* uri_str = gpr_getenv("http_proxy"); + if (uri_str == NULL) return NULL; + grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */); + char* proxy_name = NULL; + if (uri == NULL || uri->authority == NULL) { + gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var"); + goto done; + } + if (strcmp(uri->scheme, "http") != 0) { + gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme); + goto done; + } + if (strchr(uri->authority, '@') != NULL) { + gpr_log(GPR_ERROR, "userinfo not supported in proxy URI"); + goto done; + } + proxy_name = gpr_strdup(uri->authority); +done: + gpr_free(uri_str); + grpc_uri_destroy(uri); + return proxy_name; +} diff --git a/src/core/ext/client_config/http_connect_handshaker.h b/src/core/ext/client_config/http_connect_handshaker.h new file mode 100644 index 0000000000..1fc3948267 --- /dev/null +++ b/src/core/ext/client_config/http_connect_handshaker.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H +#define GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H + +#include "src/core/lib/channel/handshaker.h" + +/// Does NOT take ownership of \a proxy_server or \a server_name. +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + const char* server_name); + +/// Returns the name of the proxy to use, or NULL if no proxy is configured. +/// Caller takes ownership of result. +char* grpc_get_http_proxy_server(); + +#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H */ diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index 54408c6308..ade55704f2 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -90,6 +90,7 @@ void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses, /* TODO(roth, ctiller): Consider replacing this struct with grpc_channel_args. See comment in resolver_result.h for details. */ typedef struct grpc_lb_policy_args { + const char *server_name; grpc_lb_addresses *addresses; grpc_client_channel_factory *client_channel_factory; /* Can be used to pass implementation-specific parameters to the LB policy. */ diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c index 59c9e7dc25..63480d152b 100644 --- a/src/core/ext/client_config/resolver_result.c +++ b/src/core/ext/client_config/resolver_result.c @@ -40,17 +40,19 @@ struct grpc_resolver_result { gpr_refcount refs; + char* server_name; grpc_lb_addresses* addresses; char* lb_policy_name; grpc_channel_args* lb_policy_args; }; grpc_resolver_result* grpc_resolver_result_create( - grpc_lb_addresses* addresses, const char* lb_policy_name, - grpc_channel_args* lb_policy_args) { + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args) { grpc_resolver_result* result = gpr_malloc(sizeof(*result)); memset(result, 0, sizeof(*result)); gpr_ref_init(&result->refs, 1); + result->server_name = gpr_strdup(server_name); result->addresses = addresses; result->lb_policy_name = gpr_strdup(lb_policy_name); result->lb_policy_args = lb_policy_args; @@ -64,6 +66,7 @@ void grpc_resolver_result_ref(grpc_resolver_result* result) { void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, grpc_resolver_result* result) { if (gpr_unref(&result->refs)) { + gpr_free(result->server_name); grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */); gpr_free(result->lb_policy_name); grpc_channel_args_destroy(result->lb_policy_args); @@ -71,6 +74,10 @@ void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, } } +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result) { + return result->server_name; +} + grpc_lb_addresses* grpc_resolver_result_get_addresses( grpc_resolver_result* result) { return result->addresses; diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h index d4118b90e8..414c2e2482 100644 --- a/src/core/ext/client_config/resolver_result.h +++ b/src/core/ext/client_config/resolver_result.h @@ -50,13 +50,16 @@ typedef struct grpc_resolver_result grpc_resolver_result; /// Takes ownership of \a addresses and \a lb_policy_args. grpc_resolver_result* grpc_resolver_result_create( - grpc_lb_addresses* addresses, const char* lb_policy_name, - grpc_channel_args* lb_policy_args); + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args); void grpc_resolver_result_ref(grpc_resolver_result* result); void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, grpc_resolver_result* result); /// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. grpc_lb_addresses* grpc_resolver_result_get_addresses( grpc_resolver_result* result); diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h index ae1d96e640..218bb43e0a 100644 --- a/src/core/ext/client_config/subchannel.h +++ b/src/core/ext/client_config/subchannel.h @@ -162,6 +162,8 @@ struct grpc_subchannel_args { size_t filter_count; /** Channel arguments to be supplied to the newly created channel */ const grpc_channel_args *args; + /** Server name */ + const char *server_name; /** Address to connect to */ struct sockaddr *addr; size_t addr_len; diff --git a/src/core/ext/client_config/subchannel_index.c b/src/core/ext/client_config/subchannel_index.c index 690cb16b96..673f85b8cb 100644 --- a/src/core/ext/client_config/subchannel_index.c +++ b/src/core/ext/client_config/subchannel_index.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/avl.h> +#include <grpc/support/string_util.h> #include <grpc/support/tls.h> #include "src/core/lib/channel/channel_args.h" @@ -85,6 +86,7 @@ static grpc_subchannel_key *create_key( } else { k->args.filters = NULL; } + k->args.server_name = gpr_strdup(args->server_name); k->args.addr_len = args->addr_len; k->args.addr = gpr_malloc(args->addr_len); if (k->args.addr_len > 0) { @@ -111,6 +113,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a, if (c != 0) return c; c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; + c = strcmp(a->args.server_name, b->args.server_name); + if (c != 0) return c; if (a->args.addr_len) { c = memcmp(a->args.addr, b->args.addr, a->args.addr_len); if (c != 0) return c; @@ -126,9 +130,10 @@ static int subchannel_key_compare(grpc_subchannel_key *a, void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *k) { grpc_connector_unref(exec_ctx, k->connector); - gpr_free(k->args.addr); gpr_free((grpc_channel_args *)k->args.filters); grpc_channel_args_destroy((grpc_channel_args *)k->args.args); + gpr_free((void *)k->args.server_name); + gpr_free(k->args.addr); gpr_free(k); } diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 36db8ab00d..4ea164e639 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -283,6 +283,7 @@ typedef struct glb_lb_policy { /** mutex protecting remaining members */ gpr_mu mu; + const char *server_name; // Does not own. grpc_client_channel_factory *cc_factory; /** for communicating with the LB server */ @@ -438,6 +439,7 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); + args.server_name = glb_policy->server_name; args.client_channel_factory = glb_policy->cc_factory; args.addresses = process_serverlist(serverlist); @@ -563,6 +565,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * policy is only instantiated and used in that case. * * Create a client channel over them to communicate with a LB service */ + glb_policy->server_name = args->server_name; glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 09df92dd99..466a0fdede 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -466,6 +466,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + sc_args.server_name = args->server_name; sc_args.addr = (struct sockaddr *)(&args->addresses->addresses[i].address.addr); sc_args.addr_len = args->addresses->addresses[i].address.len; diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 0feb0740a2..037f180a9e 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -629,6 +629,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (args->addresses->addresses[i].is_balancer) continue; memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + sc_args.server_name = args->server_name; sc_args.addr = (struct sockaddr *)(&args->addresses->addresses[i].address.addr); sc_args.addr_len = args->addresses->addresses[i].address.len; diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 1b85d2d634..e8ac1b12ae 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -37,6 +37,8 @@ #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> +#include "src/core/ext/client_config/http_connect_handshaker.h" +#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" @@ -53,8 +55,10 @@ typedef struct { grpc_resolver base; /** refcount */ gpr_refcount refs; - /** name to resolve */ - char *name; + /** target name */ + char *target_name; + /** name to resolve (usually the same as target_name) */ + char *name_to_resolve; /** default port to use */ char *default_port; /** load balancing policy name */ @@ -63,7 +67,7 @@ typedef struct { /** mutex guarding the rest of the state */ gpr_mu mu; /** are we currently resolving? */ - int resolving; + bool resolving; /** which version of the result have we published? */ int published_version; /** which version of the result is current? */ @@ -165,7 +169,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolver_result *result = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); - r->resolving = 0; + r->resolving = false; if (r->addresses != NULL) { grpc_lb_addresses *addresses = grpc_lb_addresses_create(r->addresses->naddrs); @@ -176,7 +180,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, NULL /* balancer_name */, NULL /* user_data */); } grpc_resolved_addresses_destroy(r->addresses); - result = grpc_resolver_result_create(addresses, r->lb_policy_name, NULL); + result = grpc_resolver_result_create(r->target_name, addresses, + r->lb_policy_name, NULL); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); @@ -211,9 +216,9 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r) { GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GPR_ASSERT(!r->resolving); - r->resolving = 1; + r->resolving = true; r->addresses = NULL; - grpc_resolve_address(exec_ctx, r->name, r->default_port, + grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port, grpc_closure_create(dns_on_resolved, r), &r->addresses); } @@ -237,7 +242,8 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { if (r->resolved_result) { grpc_resolver_result_unref(exec_ctx, r->resolved_result); } - gpr_free(r->name); + gpr_free(r->target_name); + gpr_free(r->name_to_resolve); gpr_free(r->default_port); gpr_free(r->lb_policy_name); gpr_free(r); @@ -246,22 +252,23 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { static grpc_resolver *dns_create(grpc_resolver_args *args, const char *default_port, const char *lb_policy_name) { - dns_resolver *r; - const char *path = args->uri->path; - if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based dns uri's not supported"); return NULL; } - + // Get name from args. + const char *path = args->uri->path; if (path[0] == '/') ++path; - - r = gpr_malloc(sizeof(dns_resolver)); + // Get proxy name, if any. + char *proxy_name = grpc_get_http_proxy_server(); + // Create resolver. + dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &dns_resolver_vtable); - r->name = gpr_strdup(path); + r->target_name = gpr_strdup(path); + r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->default_port = gpr_strdup(default_port); gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index ced4fb024c..74d2015e5c 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -121,7 +121,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, if (r->next_completion != NULL && !r->published) { r->published = true; *r->target_result = grpc_resolver_result_create( - grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), + "", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), r->lb_policy_name, NULL); grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index ddc00bd79f..c2b59569fd 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -41,6 +41,7 @@ #include <grpc/support/slice_buffer.h> #include "src/core/ext/client_config/client_channel.h" +#include "src/core/ext/client_config/http_connect_handshaker.h" #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" @@ -189,6 +190,13 @@ static grpc_subchannel *client_channel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); c->handshake_mgr = grpc_handshake_manager_create(); + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + c->handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, args->server_name)); + gpr_free(proxy_name); + } args->args = final_args; s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index f36fbbfc57..31c54ff74c 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -41,6 +41,7 @@ #include <grpc/support/slice_buffer.h> #include "src/core/ext/client_config/client_channel.h" +#include "src/core/ext/client_config/http_connect_handshaker.h" #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" @@ -251,6 +252,13 @@ static grpc_subchannel *client_channel_factory_create_subchannel( c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; c->handshake_mgr = grpc_handshake_manager_create(); + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + c->handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, args->server_name)); + gpr_free(proxy_name); + } gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index ef731657af..1dd7fef76f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1928,7 +1928,8 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_error *parse_error = GRPC_ERROR_NONE; for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) { - parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]); + parse_error = + grpc_http_parser_parse(&parser, t->read_buffer.slices[i], NULL); } if (parse_error == GRPC_ERROR_NONE && (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) { diff --git a/src/core/lib/http/format_request.c b/src/core/lib/http/format_request.c index 9240356fea..e818b70113 100644 --- a/src/core/lib/http/format_request.c +++ b/src/core/lib/http/format_request.c @@ -44,7 +44,7 @@ #include "src/core/lib/support/string.h" static void fill_common_header(const grpc_httpcli_request *request, - gpr_strvec *buf) { + gpr_strvec *buf, bool connection_close) { size_t i; gpr_strvec_add(buf, gpr_strdup(request->http.path)); gpr_strvec_add(buf, gpr_strdup(" HTTP/1.0\r\n")); @@ -52,7 +52,8 @@ static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec_add(buf, gpr_strdup("Host: ")); gpr_strvec_add(buf, gpr_strdup(request->host)); gpr_strvec_add(buf, gpr_strdup("\r\n")); - gpr_strvec_add(buf, gpr_strdup("Connection: close\r\n")); + if (connection_close) + gpr_strvec_add(buf, gpr_strdup("Connection: close\r\n")); gpr_strvec_add(buf, gpr_strdup("User-Agent: " GRPC_HTTPCLI_USER_AGENT "\r\n")); /* user supplied headers */ @@ -71,7 +72,7 @@ gpr_slice grpc_httpcli_format_get_request(const grpc_httpcli_request *request) { gpr_strvec_init(&out); gpr_strvec_add(&out, gpr_strdup("GET ")); - fill_common_header(request, &out); + fill_common_header(request, &out, true); gpr_strvec_add(&out, gpr_strdup("\r\n")); flat = gpr_strvec_flatten(&out, &flat_len); @@ -91,7 +92,7 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request, gpr_strvec_init(&out); gpr_strvec_add(&out, gpr_strdup("POST ")); - fill_common_header(request, &out); + fill_common_header(request, &out, true); if (body_bytes) { uint8_t has_content_type = 0; for (i = 0; i < request->http.hdr_count; i++) { @@ -118,3 +119,16 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request, return gpr_slice_new(tmp, out_len, gpr_free); } + +gpr_slice grpc_httpcli_format_connect_request( + const grpc_httpcli_request *request) { + gpr_strvec out; + gpr_strvec_init(&out); + gpr_strvec_add(&out, gpr_strdup("CONNECT ")); + fill_common_header(request, &out, false); + gpr_strvec_add(&out, gpr_strdup("\r\n")); + size_t flat_len; + char *flat = gpr_strvec_flatten(&out, &flat_len); + gpr_strvec_destroy(&out); + return gpr_slice_new(flat, flat_len, gpr_free); +} diff --git a/src/core/lib/http/format_request.h b/src/core/lib/http/format_request.h index 1543efe4b0..7abd55f2f7 100644 --- a/src/core/lib/http/format_request.h +++ b/src/core/lib/http/format_request.h @@ -41,5 +41,7 @@ gpr_slice grpc_httpcli_format_get_request(const grpc_httpcli_request *request); gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request, const char *body_bytes, size_t body_size); +gpr_slice grpc_httpcli_format_connect_request( + const grpc_httpcli_request *request); #endif /* GRPC_CORE_LIB_HTTP_FORMAT_REQUEST_H */ diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 18135bcb58..7f3c2d120d 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -146,7 +146,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, if (GPR_SLICE_LENGTH(req->incoming.slices[i])) { req->have_read_byte = 1; grpc_error *err = - grpc_http_parser_parse(&req->parser, req->incoming.slices[i]); + grpc_http_parser_parse(&req->parser, req->incoming.slices[i], NULL); if (err != GRPC_ERROR_NONE) { finish(exec_ctx, req, err); return; diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 662e176f4c..320c0f86c6 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -93,8 +93,7 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context); 'request' contains request parameters - these are caller owned and can be destroyed once the call returns 'deadline' contains a deadline for the request (or gpr_inf_future) - 'on_response' is a callback to report results to (and 'user_data' is a user - supplied pointer to pass to said call) */ + 'on_response' is a callback to report results to */ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, const grpc_httpcli_request *request, @@ -113,8 +112,7 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, 'deadline' contains a deadline for the request (or gpr_inf_future) 'em' points to a caller owned event manager that must be alive for the lifetime of the request - 'on_response' is a callback to report results to (and 'user_data' is a user - supplied pointer to pass to said call) + 'on_response' is a callback to report results to Does not support ?var1=val1&var2=val2 in the path. */ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index 92ed08ae51..be9e9b6b63 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -33,6 +33,7 @@ #include "src/core/lib/http/parser.h" +#include <stdbool.h> #include <string.h> #include <grpc/support/alloc.h> @@ -200,7 +201,8 @@ done: return error; } -static grpc_error *finish_line(grpc_http_parser *parser) { +static grpc_error *finish_line(grpc_http_parser *parser, + bool *found_body_start) { grpc_error *err; switch (parser->state) { case GRPC_HTTP_FIRST_LINE: @@ -211,6 +213,7 @@ static grpc_error *finish_line(grpc_http_parser *parser) { case GRPC_HTTP_HEADERS: if (parser->cur_line_length == parser->cur_line_end_length) { parser->state = GRPC_HTTP_BODY; + *found_body_start = true; break; } err = add_header(parser); @@ -274,7 +277,8 @@ static bool check_line(grpc_http_parser *parser) { return false; } -static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte) { +static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte, + bool *found_body_start) { switch (parser->state) { case GRPC_HTTP_FIRST_LINE: case GRPC_HTTP_HEADERS: @@ -282,20 +286,18 @@ static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte) { if (grpc_http1_trace) gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded", GRPC_HTTP_PARSER_MAX_HEADER_LENGTH); - return 0; + return GRPC_ERROR_NONE; } parser->cur_line[parser->cur_line_length] = byte; parser->cur_line_length++; if (check_line(parser)) { - return finish_line(parser); - } else { - return GRPC_ERROR_NONE; + return finish_line(parser, found_body_start); } - GPR_UNREACHABLE_CODE(return 0); + return GRPC_ERROR_NONE; case GRPC_HTTP_BODY: return addbyte_body(parser, byte); } - GPR_UNREACHABLE_CODE(return 0); + GPR_UNREACHABLE_CODE(return GRPC_ERROR_NONE); } void grpc_http_parser_init(grpc_http_parser *parser, grpc_http_type type, @@ -331,14 +333,15 @@ void grpc_http_response_destroy(grpc_http_response *response) { gpr_free(response->hdrs); } -grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice) { - size_t i; - - for (i = 0; i < GPR_SLICE_LENGTH(slice); i++) { - grpc_error *err = addbyte(parser, GPR_SLICE_START_PTR(slice)[i]); +grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice, + size_t *start_of_body) { + for (size_t i = 0; i < GPR_SLICE_LENGTH(slice); i++) { + bool found_body_start = false; + grpc_error *err = + addbyte(parser, GPR_SLICE_START_PTR(slice)[i], &found_body_start); if (err != GRPC_ERROR_NONE) return err; + if (found_body_start && start_of_body != NULL) *start_of_body = i + 1; } - return GRPC_ERROR_NONE; } diff --git a/src/core/lib/http/parser.h b/src/core/lib/http/parser.h index 6df3cc8b13..fab42979cd 100644 --- a/src/core/lib/http/parser.h +++ b/src/core/lib/http/parser.h @@ -113,7 +113,9 @@ void grpc_http_parser_init(grpc_http_parser *parser, grpc_http_type type, void *request_or_response); void grpc_http_parser_destroy(grpc_http_parser *parser); -grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice); +/* Sets \a start_of_body to the offset in \a slice of the start of the body. */ +grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice, + size_t *start_of_body); grpc_error *grpc_http_parser_eof(grpc_http_parser *parser); void grpc_http_request_destroy(grpc_http_request *request); diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 894efc0b23..910a6f6532 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -64,7 +64,8 @@ struct grpc_endpoint_vtable { /* When data is available on the connection, calls the callback with slices. Callback success indicates that the endpoint can accept more reads, failure indicates the endpoint is closed. - Valid slices may be placed into \a slices even on callback success == 0. */ + Valid slices may be placed into \a slices even when the callback is + invoked with error != GRPC_ERROR_NONE. */ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb); diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 1895ee6245..4d20ecf922 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -43,7 +43,6 @@ typedef struct grpc_workqueue grpc_workqueue; typedef struct grpc_combiner grpc_combiner; -#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER /** Execution context. * A bag of data that collects information along a callstack. * Generally created at public API entry points, and passed down as @@ -58,12 +57,13 @@ typedef struct grpc_combiner grpc_combiner; * should actively try to finish up and get this thread back to its owner * * CONVENTIONS: - * Instance of this must ALWAYS be constructed on the stack, never - * heap allocated. Instances and pointers to them must always be called - * exec_ctx. Instances are always passed as the first argument - * to a function that takes it, and always as a pointer (grpc_exec_ctx - * is never copied). + * - Instance of this must ALWAYS be constructed on the stack, never + * heap allocated. + * - Instances and pointers to them must always be called exec_ctx. + * - Instances are always passed as the first argument to a function that + * takes it, and always as a pointer (grpc_exec_ctx is never copied). */ +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER struct grpc_exec_ctx { grpc_closure_list closure_list; /** currently active combiner: updated only via combiner.c */ diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index d7d5f6f157..78ef46d042 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -84,7 +84,7 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) { DisconnectEx(winsocket->socket, NULL, 0, 0); } else { char *utf8_message = gpr_format_message(WSAGetLastError()); - gpr_log(GPR_ERROR, "Unable to retrieve DisconnectEx pointer : %s", + gpr_log(GPR_INFO, "Unable to retrieve DisconnectEx pointer : %s", utf8_message); gpr_free(utf8_message); } |