aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/trace_context.c86
-rw-r--r--src/core/ext/census/trace_context.h68
-rw-r--r--src/core/ext/census/tracing.c16
-rw-r--r--src/core/ext/client_config/client_channel.c34
-rw-r--r--src/core/ext/client_config/client_channel.h13
-rw-r--r--src/core/ext/client_config/http_connect_handshaker.c275
-rw-r--r--src/core/ext/client_config/http_connect_handshaker.h47
-rw-r--r--src/core/ext/client_config/lb_policy_factory.c58
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h42
-rw-r--r--src/core/ext/client_config/resolver_factory.h5
-rw-r--r--src/core/ext/client_config/resolver_registry.c4
-rw-r--r--src/core/ext/client_config/resolver_registry.h3
-rw-r--r--src/core/ext/client_config/resolver_result.c129
-rw-r--r--src/core/ext/client_config/resolver_result.h106
-rw-r--r--src/core/ext/client_config/subchannel.c26
-rw-r--r--src/core/ext/client_config/subchannel.h2
-rw-r--r--src/core/ext/client_config/subchannel_index.c7
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c155
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c22
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c34
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c71
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c54
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c25
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c23
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c96
25 files changed, 992 insertions, 409 deletions
diff --git a/src/core/ext/census/trace_context.c b/src/core/ext/census/trace_context.c
new file mode 100644
index 0000000000..fbb20d3448
--- /dev/null
+++ b/src/core/ext/census/trace_context.c
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/trace_context.h"
+
+#include <grpc/census.h>
+#include <grpc/support/log.h>
+#include <stdbool.h>
+
+#include "third_party/nanopb/pb_decode.h"
+#include "third_party/nanopb/pb_encode.h"
+
+// This function assumes the TraceContext is valid.
+size_t encode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
+ const size_t buf_size) {
+ // Create a stream that will write to our buffer.
+ pb_ostream_t stream = pb_ostream_from_buffer(buffer, buf_size);
+
+ // encode message
+ bool status = pb_encode(&stream, google_trace_TraceContext_fields, ctxt);
+
+ if (!status) {
+ gpr_log(GPR_DEBUG, "TraceContext encoding failed: %s",
+ PB_GET_ERROR(&stream));
+ return 0;
+ }
+
+ return stream.bytes_written;
+}
+
+bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
+ const size_t nbytes) {
+ // Create a stream that reads nbytes from the buffer.
+ pb_istream_t stream = pb_istream_from_buffer(buffer, nbytes);
+
+ // decode message
+ bool status = pb_decode(&stream, google_trace_TraceContext_fields, ctxt);
+
+ if (!status) {
+ gpr_log(GPR_DEBUG, "TraceContext decoding failed: %s",
+ PB_GET_ERROR(&stream));
+ return false;
+ }
+
+ // check fields
+ if (!ctxt->has_trace_id) {
+ gpr_log(GPR_DEBUG, "Invalid TraceContext: missing trace_id");
+ return false;
+ }
+ if (!ctxt->has_span_id) {
+ gpr_log(GPR_DEBUG, "Invalid TraceContext: missing span_id");
+ return false;
+ }
+
+ return true;
+}
diff --git a/src/core/ext/census/trace_context.h b/src/core/ext/census/trace_context.h
new file mode 100644
index 0000000000..ee71fef460
--- /dev/null
+++ b/src/core/ext/census/trace_context.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Functions for manipulating trace contexts as defined in
+ src/proto/census/trace.proto */
+#ifndef GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H
+#define GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H
+
+#include "src/core/ext/census/gen/trace_context.pb.h"
+
+/* Maximum number of bytes required to encode a TraceContext (31)
+1 byte for trace_id field
+1 byte for trace_id length
+1 byte for trace_id.hi field
+8 bytes for trace_id.hi (uint64_t)
+1 byte for trace_id.lo field
+8 bytes for trace_id.lo (uint64_t)
+1 byte for span_id field
+8 bytes for span_id (uint64_t)
+1 byte for is_sampled field
+1 byte for is_sampled (bool) */
+#define TRACE_MAX_CONTEXT_SIZE 31
+
+/* Encode a trace context (ctxt) into proto format to the buffer provided. The
+size of buffer must be at least TRACE_MAX_CONTEXT_SIZE. On success, returns the
+number of bytes successfully encoded into buffer. On failure, returns 0. */
+size_t encode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
+ const size_t buf_size);
+
+/* Decode a proto-encoded TraceContext from the provided buffer into the
+TraceContext structure (ctxt). The function expects to be supplied the number
+of bytes to be read from buffer (nbytes). This function will also validate that
+the TraceContext has a span_id and a trace_id, and will return false if either
+of these do not exist. On success, returns true and false otherwise. */
+bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
+ const size_t nbytes);
+
+#endif
diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c
index 3b5d6dab2b..8f0e12296d 100644
--- a/src/core/ext/census/tracing.c
+++ b/src/core/ext/census/tracing.c
@@ -31,15 +31,19 @@
*
*/
+//#include "src/core/ext/census/tracing.h"
+
#include <grpc/census.h>
/* TODO(aveitch): These are all placeholder implementations. */
-int census_trace_mask(const census_context *context) {
- return CENSUS_TRACE_MASK_NONE;
-}
+// int census_trace_mask(const census_context *context) {
+// return CENSUS_TRACE_MASK_NONE;
+// }
+
+// void census_set_trace_mask(int trace_mask) {}
-void census_set_trace_mask(int trace_mask) {}
+// void census_trace_print(census_context *context, uint32_t type,
+// const char *buffer, size_t n) {}
-void census_trace_print(census_context *context, uint32_t type,
- const char *buffer, size_t n) {}
+// void SetTracerParams(const Params& params);
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 6b779efe57..b2b4fea83c 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -42,6 +42,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -63,6 +64,8 @@ typedef struct client_channel_channel_data {
grpc_resolver *resolver;
/** have we started resolving this channel */
bool started_resolving;
+ /** client channel factory */
+ grpc_client_channel_factory *client_channel_factory;
/** mutex protecting client configuration, including all
variables below in this data structure */
@@ -173,20 +176,28 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
if (chand->resolver_result != NULL) {
- lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result);
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.server_name =
+ grpc_resolver_result_get_server_name(chand->resolver_result);
+ lb_policy_args.addresses =
+ grpc_resolver_result_get_addresses(chand->resolver_result);
+ lb_policy_args.additional_args =
+ grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy = grpc_lb_policy_create(
+ exec_ctx,
+ grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
+ &lb_policy_args);
if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error);
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
-
grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
+ chand->resolver_result = NULL;
}
- chand->resolver_result = NULL;
-
if (lb_policy != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
chand->interested_parties);
@@ -346,6 +357,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
+ if (chand->client_channel_factory != NULL) {
+ grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
+ }
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
@@ -767,10 +781,12 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
-void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- grpc_resolver *resolver) {
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory) {
/* post construction initialization: set the transport setup pointer */
+ GPR_ASSERT(client_channel_factory != NULL);
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
@@ -784,6 +800,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
}
+ chand->client_channel_factory = client_channel_factory;
+ grpc_client_channel_factory_ref(client_channel_factory);
gpr_mu_unlock(&chand->mu);
}
diff --git a/src/core/ext/client_config/client_channel.h b/src/core/ext/client_config/client_channel.h
index 1e47ad34ad..abb5ac4d87 100644
--- a/src/core/ext/client_config/client_channel.h
+++ b/src/core/ext/client_config/client_channel.h
@@ -34,6 +34,7 @@
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
+#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -46,12 +47,12 @@
extern const grpc_channel_filter grpc_client_channel_filter;
-/* post-construction initializer to let the client channel know which
- transport setup it should cancel upon destruction, or initiate when it needs
- a connection */
-void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- grpc_resolver *resolver);
+/* Post-construction initializer to give the client channel its resolver
+ and factory. */
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
diff --git a/src/core/ext/client_config/http_connect_handshaker.c b/src/core/ext/client_config/http_connect_handshaker.c
new file mode 100644
index 0000000000..fda1df173e
--- /dev/null
+++ b/src/core/ext/client_config/http_connect_handshaker.c
@@ -0,0 +1,275 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_config/http_connect_handshaker.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_config/uri_parser.h"
+#include "src/core/lib/http/format_request.h"
+#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/support/env.h"
+
+typedef struct http_connect_handshaker {
+ // Base class. Must be first.
+ grpc_handshaker base;
+
+ char* proxy_server;
+ char* server_name;
+
+ // State saved while performing the handshake.
+ grpc_endpoint* endpoint;
+ grpc_channel_args* args;
+ grpc_handshaker_done_cb cb;
+ void* user_data;
+
+ // Objects for processing the HTTP CONNECT request and response.
+ gpr_slice_buffer write_buffer;
+ gpr_slice_buffer* read_buffer; // Ownership passes through this object.
+ grpc_closure request_done_closure;
+ grpc_closure response_read_closure;
+ grpc_http_parser http_parser;
+ grpc_http_response http_response;
+ grpc_timer timeout_timer;
+
+ gpr_refcount refcount;
+} http_connect_handshaker;
+
+// Unref and clean up handshaker.
+static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
+ if (gpr_unref(&handshaker->refcount)) {
+ gpr_free(handshaker->proxy_server);
+ gpr_free(handshaker->server_name);
+ gpr_slice_buffer_destroy(&handshaker->write_buffer);
+ grpc_http_parser_destroy(&handshaker->http_parser);
+ grpc_http_response_destroy(&handshaker->http_response);
+ gpr_free(handshaker);
+ }
+}
+
+// Callback invoked when deadline is exceeded.
+static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
+ grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint);
+ }
+ http_connect_handshaker_unref(handshaker);
+}
+
+// Callback invoked when finished writing HTTP CONNECT request.
+static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error != GRPC_ERROR_NONE) {
+ // If the write failed, invoke the callback immediately with the error.
+ handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
+ handshaker->read_buffer, handshaker->user_data,
+ GRPC_ERROR_REF(error));
+ } else {
+ // Otherwise, read the response.
+ grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ &handshaker->response_read_closure);
+ }
+}
+
+// Callback invoked for reading HTTP CONNECT response.
+static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback.
+ goto done;
+ }
+ // Add buffer to parser.
+ for (size_t i = 0; i < handshaker->read_buffer->count; ++i) {
+ if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) {
+ size_t body_start_offset = 0;
+ error = grpc_http_parser_parse(&handshaker->http_parser,
+ handshaker->read_buffer->slices[i],
+ &body_start_offset);
+ if (error != GRPC_ERROR_NONE) goto done;
+ if (handshaker->http_parser.state == GRPC_HTTP_BODY) {
+ // We've gotten back a successul response, so stop the timeout timer.
+ grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer);
+ // Remove the data we've already read from the read buffer,
+ // leaving only the leftover bytes (if any).
+ gpr_slice_buffer tmp_buffer;
+ gpr_slice_buffer_init(&tmp_buffer);
+ if (body_start_offset <
+ GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) {
+ gpr_slice_buffer_add(
+ &tmp_buffer,
+ gpr_slice_split_tail(&handshaker->read_buffer->slices[i],
+ body_start_offset));
+ }
+ gpr_slice_buffer_addn(&tmp_buffer,
+ &handshaker->read_buffer->slices[i + 1],
+ handshaker->read_buffer->count - i - 1);
+ gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer);
+ gpr_slice_buffer_destroy(&tmp_buffer);
+ break;
+ }
+ }
+ }
+ // If we're not done reading the response, read more data.
+ // TODO(roth): In practice, I suspect that the response to a CONNECT
+ // request will never include a body, in which case this check is
+ // sufficient. However, the language of RFC-2817 doesn't explicitly
+ // forbid the response from including a body. If there is a body,
+ // it's possible that we might have parsed part but not all of the
+ // body, in which case this check will cause us to fail to parse the
+ // remainder of the body. If that ever becomes an issue, we may
+ // need to fix the HTTP parser to understand when the body is
+ // complete (e.g., handling chunked transfer encoding or looking
+ // at the Content-Length: header).
+ if (handshaker->http_parser.state != GRPC_HTTP_BODY) {
+ gpr_slice_buffer_reset_and_unref(handshaker->read_buffer);
+ grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ &handshaker->response_read_closure);
+ return;
+ }
+ // Make sure we got a 2xx response.
+ if (handshaker->http_response.status < 200 ||
+ handshaker->http_response.status >= 300) {
+ char* msg;
+ gpr_asprintf(&msg, "HTTP proxy returned response code %d",
+ handshaker->http_response.status);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ }
+done:
+ // Invoke handshake-done callback.
+ handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
+ handshaker->read_buffer, handshaker->user_data, error);
+}
+
+//
+// Public handshaker methods
+//
+
+static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker_in) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ http_connect_handshaker_unref(handshaker);
+}
+
+static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {}
+
+static void http_connect_handshaker_do_handshake(
+ grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in,
+ grpc_endpoint* endpoint, grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer, gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb,
+ void* user_data) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ // Save state in the handshaker object.
+ handshaker->endpoint = endpoint;
+ handshaker->args = args;
+ handshaker->cb = cb;
+ handshaker->user_data = user_data;
+ handshaker->read_buffer = read_buffer;
+ // Send HTTP CONNECT request.
+ gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
+ handshaker->server_name, handshaker->proxy_server);
+ grpc_httpcli_request request;
+ memset(&request, 0, sizeof(request));
+ request.host = handshaker->proxy_server;
+ request.http.method = "CONNECT";
+ request.http.path = handshaker->server_name;
+ request.handshaker = &grpc_httpcli_plaintext;
+ gpr_slice request_slice = grpc_httpcli_format_connect_request(&request);
+ gpr_slice_buffer_add(&handshaker->write_buffer, request_slice);
+ grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer,
+ &handshaker->request_done_closure);
+ // Set timeout timer. The timer gets a reference to the handshaker.
+ gpr_ref(&handshaker->refcount);
+ grpc_timer_init(exec_ctx, &handshaker->timeout_timer,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC));
+}
+
+static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = {
+ http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
+ http_connect_handshaker_do_handshake};
+
+grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
+ const char* server_name) {
+ GPR_ASSERT(proxy_server != NULL);
+ GPR_ASSERT(server_name != NULL);
+ http_connect_handshaker* handshaker =
+ gpr_malloc(sizeof(http_connect_handshaker));
+ memset(handshaker, 0, sizeof(*handshaker));
+ grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
+ handshaker->proxy_server = gpr_strdup(proxy_server);
+ handshaker->server_name = gpr_strdup(server_name);
+ gpr_slice_buffer_init(&handshaker->write_buffer);
+ grpc_closure_init(&handshaker->request_done_closure, on_write_done,
+ handshaker);
+ grpc_closure_init(&handshaker->response_read_closure, on_read_done,
+ handshaker);
+ grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE,
+ &handshaker->http_response);
+ gpr_ref_init(&handshaker->refcount, 1);
+ return &handshaker->base;
+}
+
+char* grpc_get_http_proxy_server() {
+ char* uri_str = gpr_getenv("http_proxy");
+ if (uri_str == NULL) return NULL;
+ grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */);
+ char* proxy_name = NULL;
+ if (uri == NULL || uri->authority == NULL) {
+ gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var");
+ goto done;
+ }
+ if (strcmp(uri->scheme, "http") != 0) {
+ gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme);
+ goto done;
+ }
+ if (strchr(uri->authority, '@') != NULL) {
+ gpr_log(GPR_ERROR, "userinfo not supported in proxy URI");
+ goto done;
+ }
+ proxy_name = gpr_strdup(uri->authority);
+done:
+ gpr_free(uri_str);
+ grpc_uri_destroy(uri);
+ return proxy_name;
+}
diff --git a/src/core/ext/client_config/http_connect_handshaker.h b/src/core/ext/client_config/http_connect_handshaker.h
new file mode 100644
index 0000000000..1fc3948267
--- /dev/null
+++ b/src/core/ext/client_config/http_connect_handshaker.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H
+
+#include "src/core/lib/channel/handshaker.h"
+
+/// Does NOT take ownership of \a proxy_server or \a server_name.
+grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
+ const char* server_name);
+
+/// Returns the name of the proxy to use, or NULL if no proxy is configured.
+/// Caller takes ownership of result.
+char* grpc_get_http_proxy_server();
+
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H */
diff --git a/src/core/ext/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c
index 70e46ef3cf..c17af91a09 100644
--- a/src/core/ext/client_config/lb_policy_factory.c
+++ b/src/core/ext/client_config/lb_policy_factory.c
@@ -31,8 +31,66 @@
*
*/
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
#include "src/core/ext/client_config/lb_policy_factory.h"
+grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) {
+ grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
+ addresses->num_addresses = num_addresses;
+ const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
+ addresses->addresses = gpr_malloc(addresses_size);
+ memset(addresses->addresses, 0, addresses_size);
+ return addresses;
+}
+
+grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses,
+ void* (*user_data_copy)(void*)) {
+ grpc_lb_addresses* new_addresses =
+ grpc_lb_addresses_create(addresses->num_addresses);
+ memcpy(new_addresses->addresses, addresses->addresses,
+ sizeof(grpc_lb_address) * addresses->num_addresses);
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (new_addresses->addresses[i].balancer_name != NULL) {
+ new_addresses->addresses[i].balancer_name =
+ gpr_strdup(new_addresses->addresses[i].balancer_name);
+ }
+ if (user_data_copy != NULL) {
+ new_addresses->addresses[i].user_data =
+ user_data_copy(new_addresses->addresses[i].user_data);
+ }
+ }
+ return new_addresses;
+}
+
+void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
+ void* address, size_t address_len,
+ bool is_balancer, char* balancer_name,
+ void* user_data) {
+ GPR_ASSERT(index < addresses->num_addresses);
+ grpc_lb_address* target = &addresses->addresses[index];
+ memcpy(target->address.addr, address, address_len);
+ target->address.len = address_len;
+ target->is_balancer = is_balancer;
+ target->balancer_name = balancer_name;
+ target->user_data = user_data;
+}
+
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses,
+ void (*user_data_destroy)(void*)) {
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ gpr_free(addresses->addresses[i].balancer_name);
+ if (user_data_destroy != NULL) {
+ user_data_destroy(addresses->addresses[i].user_data);
+ }
+ }
+ gpr_free(addresses->addresses);
+ gpr_free(addresses);
+}
+
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
}
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index 7191ca7d89..ade55704f2 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -36,9 +36,9 @@
#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/lb_policy.h"
-#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/resolve_address.h"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
@@ -53,14 +53,48 @@ struct grpc_lb_policy_factory {
* Those who don't will simply ignore it and will correspondingly return NULL in
* their namesake pick() output argument. */
typedef struct grpc_lb_address {
- grpc_resolved_address *resolved_address;
+ grpc_resolved_address address;
+ bool is_balancer;
+ char *balancer_name; /* For secure naming. */
void *user_data;
} grpc_lb_address;
-typedef struct grpc_lb_policy_args {
- grpc_lb_address *addresses;
+typedef struct grpc_lb_addresses {
size_t num_addresses;
+ grpc_lb_address *addresses;
+} grpc_lb_addresses;
+
+/** Returns a grpc_addresses struct with enough space for
+ * \a num_addresses addresses. */
+grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses);
+
+/** Creates a copy of \a addresses. If \a user_data_copy is not NULL,
+ * it will be invoked to copy the \a user_data field of each address. */
+grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses,
+ void *(*user_data_copy)(void *));
+
+/** Sets the value of the address at index \a index of \a addresses.
+ * \a address is a socket address of length \a address_len.
+ * Takes ownership of \a balancer_name. */
+void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
+ void *address, size_t address_len,
+ bool is_balancer, char *balancer_name,
+ void *user_data);
+
+/** Destroys \a addresses. If \a user_data_destroy is not NULL, it will
+ * be invoked to destroy the \a user_data field of each address. */
+void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses,
+ void (*user_data_destroy)(void *));
+
+/** Arguments passed to LB policies. */
+/* TODO(roth, ctiller): Consider replacing this struct with
+ grpc_channel_args. See comment in resolver_result.h for details. */
+typedef struct grpc_lb_policy_args {
+ const char *server_name;
+ grpc_lb_addresses *addresses;
grpc_client_channel_factory *client_channel_factory;
+ /* Can be used to pass implementation-specific parameters to the LB policy. */
+ grpc_channel_args *additional_args;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/ext/client_config/resolver_factory.h b/src/core/ext/client_config/resolver_factory.h
index f69bf79564..9ec5b9a70e 100644
--- a/src/core/ext/client_config/resolver_factory.h
+++ b/src/core/ext/client_config/resolver_factory.h
@@ -47,10 +47,7 @@ struct grpc_resolver_factory {
const grpc_resolver_factory_vtable *vtable;
};
-typedef struct grpc_resolver_args {
- grpc_uri *uri;
- grpc_client_channel_factory *client_channel_factory;
-} grpc_resolver_args;
+typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args;
struct grpc_resolver_factory_vtable {
void (*ref)(grpc_resolver_factory *factory);
diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index e7a4abd568..b5308a140c 100644
--- a/src/core/ext/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -128,15 +128,13 @@ static grpc_resolver_factory *resolve_factory(const char *target,
return factory;
}
-grpc_resolver *grpc_resolver_create(
- const char *target, grpc_client_channel_factory *client_channel_factory) {
+grpc_resolver *grpc_resolver_create(const char *target) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
- args.client_channel_factory = client_channel_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
diff --git a/src/core/ext/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h
index 5ef1383cd3..92e248d548 100644
--- a/src/core/ext/client_config/resolver_registry.h
+++ b/src/core/ext/client_config/resolver_registry.h
@@ -55,8 +55,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
If a resolver factory was found, use it to instantiate a resolver and
return it.
If a resolver factory was not found, return NULL. */
-grpc_resolver *grpc_resolver_create(
- const char *target, grpc_client_channel_factory *client_channel_factory);
+grpc_resolver *grpc_resolver_create(const char *target);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c
index c6c4166e83..63480d152b 100644
--- a/src/core/ext/client_config/resolver_result.c
+++ b/src/core/ext/client_config/resolver_result.c
@@ -1,75 +1,94 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+//
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
#include "src/core/ext/client_config/resolver_result.h"
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/channel/channel_args.h"
struct grpc_resolver_result {
gpr_refcount refs;
- grpc_lb_policy *lb_policy;
+ char* server_name;
+ grpc_lb_addresses* addresses;
+ char* lb_policy_name;
+ grpc_channel_args* lb_policy_args;
};
-grpc_resolver_result *grpc_resolver_result_create() {
- grpc_resolver_result *c = gpr_malloc(sizeof(*c));
- memset(c, 0, sizeof(*c));
- gpr_ref_init(&c->refs, 1);
- return c;
+grpc_resolver_result* grpc_resolver_result_create(
+ const char* server_name, grpc_lb_addresses* addresses,
+ const char* lb_policy_name, grpc_channel_args* lb_policy_args) {
+ grpc_resolver_result* result = gpr_malloc(sizeof(*result));
+ memset(result, 0, sizeof(*result));
+ gpr_ref_init(&result->refs, 1);
+ result->server_name = gpr_strdup(server_name);
+ result->addresses = addresses;
+ result->lb_policy_name = gpr_strdup(lb_policy_name);
+ result->lb_policy_args = lb_policy_args;
+ return result;
}
-void grpc_resolver_result_ref(grpc_resolver_result *c) { gpr_ref(&c->refs); }
+void grpc_resolver_result_ref(grpc_resolver_result* result) {
+ gpr_ref(&result->refs);
+}
-void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx,
- grpc_resolver_result *c) {
- if (gpr_unref(&c->refs)) {
- if (c->lb_policy != NULL) {
- GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "resolver_result");
- }
- gpr_free(c);
+void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
+ grpc_resolver_result* result) {
+ if (gpr_unref(&result->refs)) {
+ gpr_free(result->server_name);
+ grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */);
+ gpr_free(result->lb_policy_name);
+ grpc_channel_args_destroy(result->lb_policy_args);
+ gpr_free(result);
}
}
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result *c,
- grpc_lb_policy *lb_policy) {
- GPR_ASSERT(c->lb_policy == NULL);
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "resolver_result");
- }
- c->lb_policy = lb_policy;
+const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result) {
+ return result->server_name;
+}
+
+grpc_lb_addresses* grpc_resolver_result_get_addresses(
+ grpc_resolver_result* result) {
+ return result->addresses;
+}
+
+const char* grpc_resolver_result_get_lb_policy_name(
+ grpc_resolver_result* result) {
+ return result->lb_policy_name;
}
-grpc_lb_policy *grpc_resolver_result_get_lb_policy(grpc_resolver_result *c) {
- return c->lb_policy;
+grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
+ grpc_resolver_result* result) {
+ return result->lb_policy_args;
}
diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h
index 402f7dbd7e..414c2e2482 100644
--- a/src/core/ext/client_config/resolver_result.h
+++ b/src/core/ext/client_config/resolver_result.h
@@ -1,52 +1,74 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+//
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
-#include "src/core/ext/client_config/lb_policy.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
+#include "src/core/lib/iomgr/resolve_address.h"
-/** Results reported from a grpc_resolver. */
+// TODO(roth, ctiller): In the long term, we are considering replacing
+// the resolver_result data structure with grpc_channel_args. The idea is
+// that the resolver will return a set of channel args that contains the
+// information that is currently in the resolver_result struct. For
+// example, there will be specific args indicating the set of addresses
+// and the name of the LB policy to instantiate. Note that if we did
+// this, we would probably want to change the data structure of
+// grpc_channel_args such to a hash table or AVL or some other data
+// structure that does not require linear search to find keys.
+
+/// Results reported from a grpc_resolver.
typedef struct grpc_resolver_result grpc_resolver_result;
-grpc_resolver_result *grpc_resolver_result_create();
-void grpc_resolver_result_ref(grpc_resolver_result *client_config);
-void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx,
- grpc_resolver_result *client_config);
+/// Takes ownership of \a addresses and \a lb_policy_args.
+grpc_resolver_result* grpc_resolver_result_create(
+ const char* server_name, grpc_lb_addresses* addresses,
+ const char* lb_policy_name, grpc_channel_args* lb_policy_args);
+void grpc_resolver_result_ref(grpc_resolver_result* result);
+void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
+ grpc_resolver_result* result);
+
+/// Caller does NOT take ownership of result.
+const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result);
+
+/// Caller does NOT take ownership of result.
+grpc_lb_addresses* grpc_resolver_result_get_addresses(
+ grpc_resolver_result* result);
+
+/// Caller does NOT take ownership of result.
+const char* grpc_resolver_result_get_lb_policy_name(
+ grpc_resolver_result* result);
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result *client_config,
- grpc_lb_policy *lb_policy);
-grpc_lb_policy *grpc_resolver_result_get_lb_policy(
- grpc_resolver_result *client_config);
+/// Caller does NOT take ownership of result.
+grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
+ grpc_resolver_result* result);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 4acc750fac..66feac65c3 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -33,6 +33,7 @@
#include "src/core/ext/client_config/subchannel.h"
+#include <limits.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -344,21 +345,16 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
}
if (0 ==
strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
- if (c->args->args[i].type == GRPC_ARG_INTEGER) {
- if (c->args->args[i].value.integer >= 0) {
- gpr_backoff_init(
- &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
- GRPC_SUBCHANNEL_RECONNECT_JITTER,
- GPR_MIN(c->args->args[i].value.integer,
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000),
- c->args->args[i].value.integer);
- } else {
- gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
- " : must be non-negative");
- }
- } else {
- gpr_log(GPR_ERROR,
- GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer");
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&c->args->args[i], options);
+ if (value >= 0) {
+ gpr_backoff_init(
+ &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ GPR_MIN(value,
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000),
+ value);
}
}
}
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index c51e96bc6b..e4805b6722 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -162,6 +162,8 @@ struct grpc_subchannel_args {
size_t filter_count;
/** Channel arguments to be supplied to the newly created channel */
const grpc_channel_args *args;
+ /** Server name */
+ const char *server_name;
/** Address to connect to */
grpc_resolved_address *addr;
};
diff --git a/src/core/ext/client_config/subchannel_index.c b/src/core/ext/client_config/subchannel_index.c
index 0ec83fb0ad..3aaed7d123 100644
--- a/src/core/ext/client_config/subchannel_index.c
+++ b/src/core/ext/client_config/subchannel_index.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/tls.h>
#include "src/core/lib/channel/channel_args.h"
@@ -85,6 +86,7 @@ static grpc_subchannel_key *create_key(
} else {
k->args.filters = NULL;
}
+ k->args.server_name = gpr_strdup(args->server_name);
k->args.addr = gpr_malloc(sizeof(grpc_resolved_address));
k->args.addr->len = args->addr->len;
if (k->args.addr->len > 0) {
@@ -111,6 +113,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
if (c != 0) return c;
c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
+ c = strcmp(a->args.server_name, b->args.server_name);
+ if (c != 0) return c;
if (a->args.addr->len) {
c = memcmp(a->args.addr->addr, b->args.addr->addr, a->args.addr->len);
if (c != 0) return c;
@@ -126,9 +130,10 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *k) {
grpc_connector_unref(exec_ctx, k->connector);
- gpr_free(k->args.addr);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy((grpc_channel_args *)k->args.args);
+ gpr_free((void *)k->args.server_name);
+ gpr_free(k->args.addr);
gpr_free(k);
}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 7f3c4613c6..8a2fe6d72e 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -109,6 +109,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_config/client_channel_factory.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
@@ -121,18 +122,6 @@
int grpc_lb_glb_trace = 0;
-static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
- size_t num_addresses) {
- /* free "resolved" addresses memblock */
- gpr_free(lb_addresses->resolved_address);
- for (size_t i = 0; i < num_addresses; ++i) {
- if (lb_addresses[i].user_data != NULL) {
- GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
- }
- }
- gpr_free(lb_addresses);
-}
-
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static void initial_metadata_add_lb_token(
@@ -295,6 +284,7 @@ typedef struct glb_lb_policy {
/** mutex protecting remaining members */
gpr_mu mu;
+ const char *server_name;
grpc_client_channel_factory *cc_factory;
/** for communicating with the LB server */
@@ -312,11 +302,8 @@ typedef struct glb_lb_policy {
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
- /** total number of valid addresses received in \a serverlist */
- size_t num_ok_serverlist_addresses;
-
- /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
- grpc_lb_address *lb_addresses;
+ /** addresses from \a serverlist */
+ grpc_lb_addresses *addresses;
/** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks;
@@ -369,26 +356,18 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
return true;
}
-/* populate \a addresses according to \a serverlist. Returns the number of
- * addresses successfully parsed and added to \a addresses */
-static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
- grpc_lb_address **lb_addresses) {
+/* Returns addresses extracted from \a serverlist. */
+static grpc_lb_addresses *process_serverlist(
+ const grpc_grpclb_serverlist *serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
}
- if (num_valid == 0) {
- return 0;
- }
+ if (num_valid == 0) return NULL;
- /* allocate the memory block for the "resolved" addresses. */
- grpc_resolved_address *r_addrs_memblock =
- gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
- memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
- grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
- memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
+ grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid);
/* second pass: actually populate the addresses and LB tokens (aka user data
* to the outside world) to be read by the RR policy during its creation.
@@ -400,56 +379,58 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
GPR_ASSERT(addr_idx < num_valid);
const grpc_grpclb_server *server = serverlist->servers[sl_idx];
if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
- grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
/* address processing */
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address *ip = &server->ip_address;
-
- lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
- struct sockaddr_storage *sa =
- (struct sockaddr_storage *)lb_addr->resolved_address->addr;
- size_t *sa_len = &lb_addr->resolved_address->len;
- *sa_len = 0;
+ grpc_resolved_address addr;
+ memset(&addr, 0, sizeof(addr));
if (ip->size == 4) {
- struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
- *sa_len = sizeof(struct sockaddr_in);
- memset(addr4, 0, *sa_len);
+ addr.len = sizeof(struct sockaddr_in);
+ struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr;
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 16) {
- struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
- *sa_len = sizeof(struct sockaddr_in6);
- memset(addr6, 0, *sa_len);
+ addr.len = sizeof(struct sockaddr_in6);
+ struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr;
addr6->sin6_family = AF_INET;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
}
- GPR_ASSERT(*sa_len > 0);
/* lb token processing */
+ void *user_data;
if (server->has_load_balance_token) {
const size_t lb_token_size =
GPR_ARRAY_SIZE(server->load_balance_token) - 1;
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
(uint8_t *)server->load_balance_token, lb_token_size);
- lb_addr->user_data = grpc_mdelem_from_metadata_strings(
+ user_data = grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
} else {
gpr_log(GPR_ERROR,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
- grpc_sockaddr_to_uri(lb_addr->resolved_address));
- lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
+ grpc_sockaddr_to_uri(&addr));
+ user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
}
+
+ grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
+ false /* is_balancer */,
+ NULL /* balancer_name */, user_data);
++addr_idx;
}
GPR_ASSERT(addr_idx == num_valid);
- *lb_addresses = lb_addrs;
- return num_valid;
+
+ return lb_addresses;
+}
+
+/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */
+static void lb_token_destroy(void *token) {
+ if (token != NULL) GRPC_MDELEM_UNREF(token);
}
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
@@ -459,20 +440,17 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
+ args.server_name = glb_policy->server_name;
args.client_channel_factory = glb_policy->cc_factory;
- const size_t num_ok_addresses =
- process_serverlist(serverlist, &args.addresses);
- args.num_addresses = num_ok_addresses;
+ args.addresses = process_serverlist(serverlist);
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- if (glb_policy->lb_addresses != NULL) {
+ if (glb_policy->addresses != NULL) {
/* dispose of the previous version */
- lb_addrs_destroy(glb_policy->lb_addresses,
- glb_policy->num_ok_serverlist_addresses);
+ grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
}
- glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
- glb_policy->lb_addresses = args.addresses;
+ glb_policy->addresses = args.addresses;
return rr;
}
@@ -566,6 +544,19 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
+ /* Count the number of gRPC-LB addresses. There must be at least one.
+ * TODO(roth): For now, we ignore non-balancer addresses, but in the
+ * future, we may change the behavior such that we fall back to using
+ * the non-balancer addresses if we cannot reach any balancers. At that
+ * time, this should be changed to allow a list with no balancer addresses,
+ * since the resolver might fail to return a balancer address even when
+ * this is the right LB policy to use. */
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; ++i) {
+ if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ if (num_grpclb_addrs == 0) return NULL;
+
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
memset(glb_policy, 0, sizeof(*glb_policy));
@@ -575,37 +566,35 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
+ glb_policy->server_name = gpr_strdup(args->server_name);
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
- if (args->num_addresses == 0) {
- return NULL;
- }
-
- if (args->addresses[0].user_data != NULL) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
- }
/* construct a target from the addresses in args, given in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
- addr_strs[0] = grpc_sockaddr_to_uri(args->addresses[0].resolved_address);
- for (size_t i = 1; i < args->num_addresses; i++) {
- if (args->addresses[i].user_data != NULL) {
+ char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
+ size_t addr_index = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (args->addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
-
- GPR_ASSERT(
- grpc_sockaddr_to_string(
- &addr_strs[i],
- args->addresses[i].resolved_address,
- true) == 0);
+ if (args->addresses->addresses[i].is_balancer) {
+ if (addr_index == 0) {
+ addr_strs[addr_index++] = grpc_sockaddr_to_uri(
+ &args->addresses->addresses[i].address);
+ } else {
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_strs[addr_index++],
+ &args->addresses->addresses[i].address,
+ true) == 0);
+ }
+ }
}
size_t uri_path_len;
- char *target_uri_str = gpr_strjoin_sep(
- (const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
+ char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
+ num_grpclb_addrs, ",", &uri_path_len);
/* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
@@ -613,7 +602,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
gpr_free(target_uri_str);
- for (size_t i = 0; i < args->num_addresses; i++) {
+ for (size_t i = 0; i < num_grpclb_addrs; i++) {
gpr_free(addr_strs[i]);
}
gpr_free(addr_strs);
@@ -642,6 +631,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
GPR_ASSERT(glb_policy->pending_picks == NULL);
GPR_ASSERT(glb_policy->pending_pings == NULL);
+ gpr_free((void *)glb_policy->server_name);
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
@@ -649,9 +639,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
gpr_mu_destroy(&glb_policy->mu);
-
- lb_addrs_destroy(glb_policy->lb_addresses,
- glb_policy->num_ok_serverlist_addresses);
+ grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
gpr_free(glb_policy);
}
@@ -947,9 +935,8 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
* entities passed to glb_pick(). */
lb_client->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
- glb_policy->base.interested_parties, "/BalanceLoad",
- NULL, /* FIXME(dgq): which "host" value to use? */
- lb_client->deadline, NULL);
+ glb_policy->base.interested_parties,
+ "/grpc.lb.v1.LoadBalancer/BalanceLoad", NULL, lb_client->deadline, NULL);
grpc_metadata_array_init(&lb_client->initial_metadata_recv);
grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 00b9660049..892b5feda9 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -441,23 +441,33 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->num_addresses == 0) return NULL;
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses);
+ p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->num_addresses; i++) {
- if (args->addresses[i].user_data != NULL) {
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ /* Skip balancer addresses, since we only know how to handle backends. */
+ if (args->addresses->addresses[i].is_balancer) continue;
+
+ if (args->addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = args->addresses[i].resolved_address;
+ sc_args.server_name = args->server_name;
+ sc_args.addr = &args->addresses->addresses[i].address;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index eee3dce249..dc23f12015 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -130,10 +130,6 @@ struct round_robin_lb_policy {
/** total number of addresses received at creation time */
size_t num_addresses;
- /** array holding the borrowed and opaque pointers to incoming user data, one
- * per incoming address. These individual pointers will be returned as-is in
- * successful picks. */
- void **user_data_pointers;
/** all our subchannels */
size_t num_subchannels;
@@ -282,7 +278,6 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
elem = tmp;
}
- gpr_free(p->user_data_pointers);
gpr_free(p);
}
@@ -611,24 +606,31 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->num_addresses == 0) return NULL;
+
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->num_addresses = args->num_addresses;
- p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
- p->user_data_pointers = gpr_malloc(sizeof(void *) * p->num_addresses);
- memset(p->user_data_pointers, 0, sizeof(void *) * p->num_addresses);
+ p->num_addresses = num_addrs;
+ p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < p->num_addresses; i++) {
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = args->addresses[i].resolved_address;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ /* Skip balancer addresses, since we only know how to handle backends. */
+ if (args->addresses->addresses[i].is_balancer) continue;
- p->user_data_pointers[i] = args->addresses[i].user_data;
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ sc_args.server_name = args->server_name;
+ sc_args.addr = &args->addresses->addresses[i].address;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
@@ -640,7 +642,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
- sd->user_data = p->user_data_pointers[i];
+ sd->user_data = args->addresses->addresses[i].user_data;
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 32e9de69a6..e8ac1b12ae 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -37,6 +37,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
+#include "src/core/ext/client_config/http_connect_handshaker.h"
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
@@ -54,19 +55,19 @@ typedef struct {
grpc_resolver base;
/** refcount */
gpr_refcount refs;
- /** name to resolve */
- char *name;
+ /** target name */
+ char *target_name;
+ /** name to resolve (usually the same as target_name) */
+ char *name_to_resolve;
/** default port to use */
char *default_port;
- /** subchannel factory */
- grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** are we currently resolving? */
- int resolving;
+ bool resolving;
/** which version of the result have we published? */
int published_version;
/** which version of the result is current? */
@@ -166,32 +167,21 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
dns_resolver *r = arg;
grpc_resolver_result *result = NULL;
- grpc_lb_policy *lb_policy;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
- r->resolving = 0;
- grpc_resolved_addresses *addresses = r->addresses;
- if (addresses != NULL) {
- grpc_lb_policy_args lb_policy_args;
- result = grpc_resolver_result_create();
- memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.num_addresses = addresses->naddrs;
- lb_policy_args.addresses =
- gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
- memset(lb_policy_args.addresses, 0,
- sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
- for (size_t i = 0; i < addresses->naddrs; ++i) {
- lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i];
+ r->resolving = false;
+ if (r->addresses != NULL) {
+ grpc_lb_addresses *addresses =
+ grpc_lb_addresses_create(r->addresses->naddrs);
+ for (size_t i = 0; i < r->addresses->naddrs; ++i) {
+ grpc_lb_addresses_set_address(
+ addresses, i, &r->addresses->addrs[i].addr,
+ r->addresses->addrs[i].len, false /* is_balancer */,
+ NULL /* balancer_name */, NULL /* user_data */);
}
- lb_policy_args.client_channel_factory = r->client_channel_factory;
- lb_policy =
- grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
- gpr_free(lb_policy_args.addresses);
- if (lb_policy != NULL) {
- grpc_resolver_result_set_lb_policy(result, lb_policy);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
- }
- grpc_resolved_addresses_destroy(addresses);
+ grpc_resolved_addresses_destroy(r->addresses);
+ result = grpc_resolver_result_create(r->target_name, addresses,
+ r->lb_policy_name, NULL);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
@@ -226,9 +216,9 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
dns_resolver *r) {
GRPC_RESOLVER_REF(&r->base, "dns-resolving");
GPR_ASSERT(!r->resolving);
- r->resolving = 1;
+ r->resolving = true;
r->addresses = NULL;
- grpc_resolve_address(exec_ctx, r->name, r->default_port,
+ grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port,
grpc_closure_create(dns_on_resolved, r), &r->addresses);
}
@@ -252,8 +242,8 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_result) {
grpc_resolver_result_unref(exec_ctx, r->resolved_result);
}
- grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
- gpr_free(r->name);
+ gpr_free(r->target_name);
+ gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -262,27 +252,26 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
static grpc_resolver *dns_create(grpc_resolver_args *args,
const char *default_port,
const char *lb_policy_name) {
- dns_resolver *r;
- const char *path = args->uri->path;
-
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
return NULL;
}
-
+ // Get name from args.
+ const char *path = args->uri->path;
if (path[0] == '/') ++path;
-
- r = gpr_malloc(sizeof(dns_resolver));
+ // Get proxy name, if any.
+ char *proxy_name = grpc_get_http_proxy_server();
+ // Create resolver.
+ dns_resolver *r = gpr_malloc(sizeof(dns_resolver));
memset(r, 0, sizeof(*r));
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &dns_resolver_vtable);
- r->name = gpr_strdup(path);
+ r->target_name = gpr_strdup(path);
+ r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name;
r->default_port = gpr_strdup(default_port);
- r->client_channel_factory = args->client_channel_factory;
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
- grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 8229c29a92..446586e33b 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -40,7 +40,6 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
@@ -52,18 +51,16 @@ typedef struct {
grpc_resolver base;
/** refcount */
gpr_refcount refs;
- /** subchannel factory */
- grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
/** the addresses that we've 'resolved' */
- grpc_resolved_addresses *addresses;
+ grpc_lb_addresses *addresses;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
- int published;
+ bool published;
/** pending next completion, or NULL */
grpc_closure *next_completion;
/** target result address for next completion */
@@ -102,7 +99,7 @@ static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
gpr_mu_lock(&r->mu);
- r->published = 0;
+ r->published = false;
sockaddr_maybe_finish_next_locked(exec_ctx, r);
gpr_mu_unlock(&r->mu);
}
@@ -122,25 +119,10 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r) {
if (r->next_completion != NULL && !r->published) {
- grpc_resolver_result *result = grpc_resolver_result_create();
- grpc_lb_policy_args lb_policy_args;
- memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.num_addresses = r->addresses->naddrs;
- lb_policy_args.addresses =
- gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
- memset(lb_policy_args.addresses, 0,
- sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
- for (size_t i = 0; i < lb_policy_args.num_addresses; ++i) {
- lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i];
- }
- lb_policy_args.client_channel_factory = r->client_channel_factory;
- grpc_lb_policy *lb_policy =
- grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
- gpr_free(lb_policy_args.addresses);
- grpc_resolver_result_set_lb_policy(result, lb_policy);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
- r->published = 1;
- *r->target_result = result;
+ r->published = true;
+ *r->target_result = grpc_resolver_result_create(
+ "", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
+ r->lb_policy_name, NULL);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
@@ -149,8 +131,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
- grpc_resolved_addresses_destroy(r->addresses);
+ grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
@@ -183,7 +164,7 @@ static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_resolver_args *args, const char *default_lb_policy_name,
int parse(grpc_uri *uri, grpc_resolved_address *dst)) {
- int errors_found = 0; /* GPR_FALSE */
+ bool errors_found = false;
sockaddr_resolver *r;
gpr_slice path_slice;
gpr_slice_buffer path_parts;
@@ -224,19 +205,16 @@ static grpc_resolver *sockaddr_create(
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
- r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
- r->addresses->naddrs = path_parts.count;
- r->addresses->addrs =
- gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs);
-
- for (size_t i = 0; i < r->addresses->naddrs; i++) {
+ r->addresses = grpc_lb_addresses_create(path_parts.count);
+ for (size_t i = 0; i < r->addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
- if (!parse(&ith_uri, &r->addresses->addrs[i])) {
- errors_found = 1; /* GPR_TRUE */
+ if (!parse(&ith_uri, &r->addresses->addresses[i].address)) {
+ errors_found = true; /* GPR_TRUE */
}
gpr_free(part_str);
+ r->addresses->addresses[i].is_balancer = lb_enabled;
if (errors_found) break;
}
@@ -244,7 +222,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice_unref(path_slice);
if (errors_found) {
gpr_free(r->lb_policy_name);
- grpc_resolved_addresses_destroy(r->addresses);
+ grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r);
return NULL;
}
@@ -252,8 +230,6 @@ static grpc_resolver *sockaddr_create(
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
- r->client_channel_factory = args->client_channel_factory;
- grpc_client_channel_factory_ref(r->client_channel_factory);
return &r->base;
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index acbc81370d..0d34247ae3 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -41,6 +41,7 @@
#include <grpc/support/slice_buffer.h>
#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/http_connect_handshaker.h"
#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
@@ -160,7 +161,6 @@ typedef struct {
grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
- grpc_channel *master;
} client_channel_factory;
static void client_channel_factory_ref(
@@ -173,10 +173,6 @@ static void client_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
- if (f->master != NULL) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
- "client_channel_factory");
- }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
@@ -194,6 +190,13 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
c->handshake_mgr = grpc_handshake_manager_create();
+ char *proxy_name = grpc_get_http_proxy_server();
+ if (proxy_name != NULL) {
+ grpc_handshake_manager_add(
+ c->handshake_mgr,
+ grpc_http_connect_handshaker_create(proxy_name, args->server_name));
+ gpr_free(proxy_name);
+ }
args->args = final_args;
s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
@@ -210,15 +213,15 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(final_args);
- grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ grpc_resolver *resolver = grpc_resolver_create(target);
if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel");
return NULL;
}
- grpc_client_channel_set_resolver(
- exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ grpc_client_channel_finish_initialization(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
return channel;
@@ -250,12 +253,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
- if (channel != NULL) {
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create");
- }
- grpc_client_channel_factory_unref(&exec_ctx, &f->base);
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
return channel != NULL ? channel : grpc_lame_client_channel_create(
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 344d56ff86..4240827c29 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -41,6 +41,7 @@
#include <grpc/support/slice_buffer.h>
#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/http_connect_handshaker.h"
#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
@@ -220,7 +221,6 @@ typedef struct {
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
- grpc_channel *master;
} client_channel_factory;
static void client_channel_factory_ref(
@@ -235,10 +235,6 @@ static void client_channel_factory_unref(
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"client_channel_factory");
- if (f->master != NULL) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
- "client_channel_factory");
- }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
@@ -256,6 +252,13 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
c->handshake_mgr = grpc_handshake_manager_create();
+ char *proxy_name = grpc_get_http_proxy_server();
+ if (proxy_name != NULL) {
+ grpc_handshake_manager_add(
+ c->handshake_mgr,
+ grpc_http_connect_handshaker_create(proxy_name, args->server_name));
+ gpr_free(proxy_name);
+ }
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
args->args = final_args;
@@ -276,10 +279,10 @@ static grpc_channel *client_channel_factory_create_channel(
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(final_args);
- grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ grpc_resolver *resolver = grpc_resolver_create(target);
if (resolver != NULL) {
- grpc_client_channel_set_resolver(
- exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ grpc_client_channel_finish_initialization(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create");
} else {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
@@ -356,10 +359,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
- if (channel != NULL) {
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_secure_channel_create");
- }
grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 53da0e5d70..1dd7fef76f 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -33,6 +33,7 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include <limits.h>
#include <math.h>
#include <stdio.h>
#include <string.h>
@@ -46,6 +47,7 @@
#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
@@ -333,76 +335,65 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (is_client) {
gpr_log(GPR_ERROR, "%s: is ignored on the client",
GRPC_ARG_MAX_CONCURRENT_STREAMS);
- } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s: must be an integer",
- GRPC_ARG_MAX_CONCURRENT_STREAMS);
} else {
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
- (uint32_t)channel_args->args[i].value.integer);
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
+ push_setting(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ (uint32_t)value);
+ }
}
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
- if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s: must be an integer",
- GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER);
- } else if ((t->global.next_stream_id & 1) !=
- (channel_args->args[i].value.integer & 1)) {
- gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
- GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
- t->global.next_stream_id & 1,
- is_client ? "client" : "server");
- } else {
- t->global.next_stream_id =
- (uint32_t)channel_args->args[i].value.integer;
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
+ if ((t->global.next_stream_id & 1) != (value & 1)) {
+ gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
+ GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
+ t->global.next_stream_id & 1,
+ is_client ? "client" : "server");
+ } else {
+ t->global.next_stream_id = (uint32_t)value;
+ }
}
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) {
- if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s: must be an integer",
- GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES);
- } else if (channel_args->args[i].value.integer <= 5) {
- gpr_log(GPR_ERROR, "%s: must be at least 5",
- GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES);
- } else {
- t->global.stream_lookahead =
- (uint32_t)channel_args->args[i].value.integer;
+ const grpc_integer_options options = {-1, 5, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
+ t->global.stream_lookahead = (uint32_t)value;
}
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER)) {
- if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s: must be an integer",
- GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
- } else if (channel_args->args[i].value.integer < 0) {
- gpr_log(GPR_ERROR, "%s: must be non-negative",
- GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
- } else {
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
- (uint32_t)channel_args->args[i].value.integer);
+ (uint32_t)value);
}
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
- if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s: must be an integer",
- GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER);
- } else if (channel_args->args[i].value.integer < 0) {
- gpr_log(GPR_ERROR, "%s: must be non-negative",
- GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER);
- } else {
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
grpc_chttp2_hpack_compressor_set_max_usable_size(
- &t->writing.hpack_compressor,
- (uint32_t)channel_args->args[i].value.integer);
+ &t->writing.hpack_compressor, (uint32_t)value);
}
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_MAX_METADATA_SIZE)) {
- if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s: must be an integer",
- GRPC_ARG_MAX_METADATA_SIZE);
- } else if (channel_args->args[i].value.integer < 0) {
- gpr_log(GPR_ERROR, "%s: must be non-negative",
- GRPC_ARG_MAX_METADATA_SIZE);
- } else {
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
- (uint32_t)channel_args->args[i].value.integer);
+ (uint32_t)value);
}
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_MAX_FRAME_SIZE)) {
@@ -1937,7 +1928,8 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_error *parse_error = GRPC_ERROR_NONE;
for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
- parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]);
+ parse_error =
+ grpc_http_parser_parse(&parser, t->read_buffer.slices[i], NULL);
}
if (parse_error == GRPC_ERROR_NONE &&
(parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {