aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel_factory.cc2
-rw-r--r--src/core/ext/filters/client_channel/client_channel_factory.h4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h5
-rw-r--r--src/core/ext/filters/client_channel/parse_address.cc29
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc36
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h15
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.cc32
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.h3
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.cc10
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc44
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc16
-rw-r--r--src/core/lib/iomgr/cfstream_handle.cc97
-rw-r--r--src/core/lib/iomgr/cfstream_handle.h2
-rw-r--r--src/core/lib/iomgr/grpc_if_nametoindex.h30
-rw-r--r--src/core/lib/iomgr/grpc_if_nametoindex_posix.cc42
-rw-r--r--src/core/lib/iomgr/grpc_if_nametoindex_unsupported.cc38
-rw-r--r--src/core/lib/iomgr/port.h1
-rw-r--r--src/core/lib/iomgr/resource_quota.cc1
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc94
-rw-r--r--src/core/lib/surface/channel_init.h5
-rw-r--r--src/core/lib/surface/server.cc104
-rw-r--r--src/core/lib/surface/version.cc2
-rw-r--r--src/core/lib/transport/metadata.cc1
23 files changed, 360 insertions, 253 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel_factory.cc b/src/core/ext/filters/client_channel/client_channel_factory.cc
index 172e9f03c7..130bbe0418 100644
--- a/src/core/ext/filters/client_channel/client_channel_factory.cc
+++ b/src/core/ext/filters/client_channel/client_channel_factory.cc
@@ -30,7 +30,7 @@ void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory) {
}
grpc_subchannel* grpc_client_channel_factory_create_subchannel(
- grpc_client_channel_factory* factory, const grpc_subchannel_args* args) {
+ grpc_client_channel_factory* factory, const grpc_channel_args* args) {
return factory->vtable->create_subchannel(factory, args);
}
diff --git a/src/core/ext/filters/client_channel/client_channel_factory.h b/src/core/ext/filters/client_channel/client_channel_factory.h
index 601ec46b2a..91dec12282 100644
--- a/src/core/ext/filters/client_channel/client_channel_factory.h
+++ b/src/core/ext/filters/client_channel/client_channel_factory.h
@@ -49,7 +49,7 @@ struct grpc_client_channel_factory_vtable {
void (*ref)(grpc_client_channel_factory* factory);
void (*unref)(grpc_client_channel_factory* factory);
grpc_subchannel* (*create_subchannel)(grpc_client_channel_factory* factory,
- const grpc_subchannel_args* args);
+ const grpc_channel_args* args);
grpc_channel* (*create_client_channel)(grpc_client_channel_factory* factory,
const char* target,
grpc_client_channel_type type,
@@ -61,7 +61,7 @@ void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory);
/** Create a new grpc_subchannel */
grpc_subchannel* grpc_client_channel_factory_create_subchannel(
- grpc_client_channel_factory* factory, const grpc_subchannel_args* args);
+ grpc_client_channel_factory* factory, const grpc_channel_args* args);
/** Create a new grpc_channel */
grpc_channel* grpc_client_channel_factory_create_channel(
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 6f31a643c1..1d0ecbe3f6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -509,12 +509,10 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
GRPC_ARG_SERVER_ADDRESS_LIST,
GRPC_ARG_INHIBIT_HEALTH_CHECKING};
// Create a subchannel for each address.
- grpc_subchannel_args sc_args;
for (size_t i = 0; i < addresses.size(); i++) {
// If there were any balancer addresses, we would have chosen grpclb
// policy, which does not use a SubchannelList.
GPR_ASSERT(!addresses[i].IsBalancer());
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
InlinedVector<grpc_arg, 4> args_to_add;
args_to_add.emplace_back(
grpc_create_subchannel_address_arg(&addresses[i].address()));
@@ -527,9 +525,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
&args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
args_to_add.data(), args_to_add.size());
gpr_free(args_to_add[0].value.string);
- sc_args.args = new_args;
grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
- client_channel_factory, &sc_args);
+ client_channel_factory, new_args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) {
// Subchannel could not be created.
diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc
index 707beb8876..c5e1ed811b 100644
--- a/src/core/ext/filters/client_channel/parse_address.cc
+++ b/src/core/ext/filters/client_channel/parse_address.cc
@@ -19,6 +19,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/lib/iomgr/grpc_if_nametoindex.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
@@ -35,6 +36,11 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
+#ifdef GRPC_POSIX_SOCKET
+#include <errno.h>
+#include <net/if.h>
+#endif
+
#ifdef GRPC_HAVE_UNIX_SOCKET
bool grpc_parse_unix(const grpc_uri* uri,
@@ -69,7 +75,12 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr,
// Split host and port.
char* host;
char* port;
- if (!gpr_split_host_port(hostport, &host, &port)) return false;
+ if (!gpr_split_host_port(hostport, &host, &port)) {
+ if (log_errors) {
+ gpr_log(GPR_ERROR, "Failed gpr_split_host_port(%s, ...)", hostport);
+ }
+ return false;
+ }
// Parse IP address.
memset(addr, 0, sizeof(*addr));
addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
@@ -115,7 +126,12 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
// Split host and port.
char* host;
char* port;
- if (!gpr_split_host_port(hostport, &host, &port)) return false;
+ if (!gpr_split_host_port(hostport, &host, &port)) {
+ if (log_errors) {
+ gpr_log(GPR_ERROR, "Failed gpr_split_host_port(%s, ...)", hostport);
+ }
+ return false;
+ }
// Parse IP address.
memset(addr, 0, sizeof(*addr));
addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
@@ -150,10 +166,13 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
if (gpr_parse_bytes_to_uint32(host_end + 1,
strlen(host) - host_without_scope_len - 1,
&sin6_scope_id) == 0) {
- if (log_errors) {
- gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1);
+ if ((sin6_scope_id = grpc_if_nametoindex(host_end + 1)) == 0) {
+ gpr_log(GPR_ERROR,
+ "Invalid interface name: '%s'. "
+ "Non-numeric and failed if_nametoindex.",
+ host_end + 1);
+ goto done;
}
- goto done;
}
// Handle "sin6_scope_id" being type "u_long". See grpc issue #10027.
in6->sin6_scope_id = sin6_scope_id;
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 9077aa9753..640a052e91 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -64,18 +64,6 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-namespace {
-struct state_watcher {
- grpc_closure closure;
- grpc_subchannel* subchannel;
- grpc_connectivity_state connectivity_state;
- grpc_connectivity_state last_connectivity_state;
- grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client;
- grpc_closure health_check_closure;
- grpc_connectivity_state health_state;
-};
-} // namespace
-
typedef struct external_state_watcher {
grpc_subchannel* subchannel;
grpc_pollset_set* pollset_set;
@@ -101,9 +89,6 @@ struct grpc_subchannel {
keep the subchannel open */
gpr_atm ref_pair;
- /** non-transport related channel filters */
- const grpc_channel_filter** filters;
- size_t num_filters;
/** channel arguments */
grpc_channel_args* args;
@@ -384,7 +369,6 @@ static void subchannel_destroy(void* arg, grpc_error* error) {
c->channelz_subchannel->MarkSubchannelDestroyed();
c->channelz_subchannel.reset();
}
- gpr_free((void*)c->filters);
c->health_check_service_name.reset();
grpc_channel_args_destroy(c->args);
grpc_connectivity_state_destroy(&c->state_tracker);
@@ -553,7 +537,7 @@ struct HealthCheckParams {
} // namespace grpc_core
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
- const grpc_subchannel_args* args) {
+ const grpc_channel_args* args) {
grpc_subchannel_key* key = grpc_subchannel_key_create(args);
grpc_subchannel* c = grpc_subchannel_index_find(key);
if (c) {
@@ -567,23 +551,13 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
c->connector = connector;
grpc_connector_ref(c->connector);
- c->num_filters = args->filter_count;
- if (c->num_filters > 0) {
- c->filters = static_cast<const grpc_channel_filter**>(
- gpr_malloc(sizeof(grpc_channel_filter*) * c->num_filters));
- memcpy((void*)c->filters, args->filters,
- sizeof(grpc_channel_filter*) * c->num_filters);
- } else {
- c->filters = nullptr;
- }
c->pollset_set = grpc_pollset_set_create();
grpc_resolved_address* addr =
static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
- grpc_get_subchannel_address_arg(args->args, addr);
+ grpc_get_subchannel_address_arg(args, addr);
grpc_resolved_address* new_address = nullptr;
grpc_channel_args* new_args = nullptr;
- if (grpc_proxy_mappers_map_address(addr, args->args, &new_address,
- &new_args)) {
+ if (grpc_proxy_mappers_map_address(addr, args, &new_address, &new_args)) {
GPR_ASSERT(new_address != nullptr);
gpr_free(addr);
addr = new_address;
@@ -592,7 +566,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
gpr_free(addr);
c->args = grpc_channel_args_copy_and_add_and_remove(
- new_args != nullptr ? new_args : args->args, keys_to_remove,
+ new_args != nullptr ? new_args : args, keys_to_remove,
GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
gpr_free(new_arg.value.string);
if (new_args != nullptr) grpc_channel_args_destroy(new_args);
@@ -605,7 +579,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
grpc_core::BackOff::Options backoff_options;
- parse_args_for_backoff_values(args->args, &backoff_options,
+ parse_args_for_backoff_values(args, &backoff_options,
&c->min_connect_timeout_ms);
c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 14f87f2c68..8c994c64f5 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -38,7 +38,6 @@
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
-typedef struct grpc_subchannel_args grpc_subchannel_args;
typedef struct grpc_subchannel_key grpc_subchannel_key;
#ifndef NDEBUG
@@ -186,21 +185,9 @@ void grpc_subchannel_call_set_cleanup_closure(
grpc_call_stack* grpc_subchannel_call_get_call_stack(
grpc_subchannel_call* subchannel_call);
-struct grpc_subchannel_args {
- /* When updating this struct, also update subchannel_index.c */
-
- /** Channel filters for this channel - wrapped factories will likely
- want to mutate this */
- const grpc_channel_filter** filters;
- /** The number of filters in the above array */
- size_t filter_count;
- /** Channel arguments to be supplied to the newly created channel */
- const grpc_channel_args* args;
-};
-
/** create a subchannel given a connector */
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
- const grpc_subchannel_args* args);
+ const grpc_channel_args* args);
/// Sets \a addr from \a args.
void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc
index aa8441f17b..d0ceda8312 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.cc
+++ b/src/core/ext/filters/client_channel/subchannel_index.cc
@@ -39,55 +39,37 @@ static gpr_mu g_mu;
static gpr_refcount g_refcount;
struct grpc_subchannel_key {
- grpc_subchannel_args args;
+ grpc_channel_args* args;
};
static bool g_force_creation = false;
static grpc_subchannel_key* create_key(
- const grpc_subchannel_args* args,
+ const grpc_channel_args* args,
grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) {
grpc_subchannel_key* k =
static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k)));
- k->args.filter_count = args->filter_count;
- if (k->args.filter_count > 0) {
- k->args.filters = static_cast<const grpc_channel_filter**>(
- gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count));
- memcpy(reinterpret_cast<grpc_channel_filter*>(k->args.filters),
- args->filters, sizeof(*k->args.filters) * k->args.filter_count);
- } else {
- k->args.filters = nullptr;
- }
- k->args.args = copy_channel_args(args->args);
+ k->args = copy_channel_args(args);
return k;
}
-grpc_subchannel_key* grpc_subchannel_key_create(
- const grpc_subchannel_args* args) {
+grpc_subchannel_key* grpc_subchannel_key_create(const grpc_channel_args* args) {
return create_key(args, grpc_channel_args_normalize);
}
static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) {
- return create_key(&k->args, grpc_channel_args_copy);
+ return create_key(k->args, grpc_channel_args_copy);
}
int grpc_subchannel_key_compare(const grpc_subchannel_key* a,
const grpc_subchannel_key* b) {
// To pretend the keys are different, return a non-zero value.
if (GPR_UNLIKELY(g_force_creation)) return 1;
- int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
- if (c != 0) return c;
- if (a->args.filter_count > 0) {
- c = memcmp(a->args.filters, b->args.filters,
- a->args.filter_count * sizeof(*a->args.filters));
- if (c != 0) return c;
- }
- return grpc_channel_args_compare(a->args.args, b->args.args);
+ return grpc_channel_args_compare(a->args, b->args);
}
void grpc_subchannel_key_destroy(grpc_subchannel_key* k) {
- gpr_free(reinterpret_cast<grpc_channel_args*>(k->args.filters));
- grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args));
+ grpc_channel_args_destroy(k->args);
gpr_free(k);
}
diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index c135613d26..429634bd54 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -27,8 +27,7 @@
shared amongst channels */
/** Create a key that can be used to uniquely identify a subchannel */
-grpc_subchannel_key* grpc_subchannel_key_create(
- const grpc_subchannel_args* args);
+grpc_subchannel_key* grpc_subchannel_key_create(const grpc_channel_args* args);
/** Destroy a subchannel key */
void grpc_subchannel_key_destroy(grpc_subchannel_key* key);
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
index e6c8c38260..a5bf1bf21d 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
@@ -40,14 +40,12 @@ static void client_channel_factory_unref(
grpc_client_channel_factory* cc_factory) {}
static grpc_subchannel* client_channel_factory_create_subchannel(
- grpc_client_channel_factory* cc_factory, const grpc_subchannel_args* args) {
- grpc_subchannel_args final_sc_args;
- memcpy(&final_sc_args, args, sizeof(*args));
- final_sc_args.args = grpc_default_authority_add_if_not_present(args->args);
+ grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) {
+ grpc_channel_args* new_args = grpc_default_authority_add_if_not_present(args);
grpc_connector* connector = grpc_chttp2_connector_create();
- grpc_subchannel* s = grpc_subchannel_create(connector, &final_sc_args);
+ grpc_subchannel* s = grpc_subchannel_create(connector, new_args);
grpc_connector_unref(connector);
- grpc_channel_args_destroy(const_cast<grpc_channel_args*>(final_sc_args.args));
+ grpc_channel_args_destroy(new_args);
return s;
}
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
index 9612698e96..ddd538faa8 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
@@ -46,10 +46,10 @@ static void client_channel_factory_ref(
static void client_channel_factory_unref(
grpc_client_channel_factory* cc_factory) {}
-static grpc_subchannel_args* get_secure_naming_subchannel_args(
- const grpc_subchannel_args* args) {
+static grpc_channel_args* get_secure_naming_channel_args(
+ const grpc_channel_args* args) {
grpc_channel_credentials* channel_credentials =
- grpc_channel_credentials_find_in_args(args->args);
+ grpc_channel_credentials_find_in_args(args);
if (channel_credentials == nullptr) {
gpr_log(GPR_ERROR,
"Can't create subchannel: channel credentials missing for secure "
@@ -57,7 +57,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
return nullptr;
}
// Make sure security connector does not already exist in args.
- if (grpc_security_connector_find_in_args(args->args) != nullptr) {
+ if (grpc_security_connector_find_in_args(args) != nullptr) {
gpr_log(GPR_ERROR,
"Can't create subchannel: security connector already present in "
"channel args.");
@@ -65,19 +65,18 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
}
// To which address are we connecting? By default, use the server URI.
const grpc_arg* server_uri_arg =
- grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
+ grpc_channel_args_find(args, GRPC_ARG_SERVER_URI);
const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg);
GPR_ASSERT(server_uri_str != nullptr);
grpc_uri* server_uri =
grpc_uri_parse(server_uri_str, true /* supress errors */);
GPR_ASSERT(server_uri != nullptr);
const grpc_core::TargetAuthorityTable* target_authority_table =
- grpc_core::FindTargetAuthorityTableInArgs(args->args);
+ grpc_core::FindTargetAuthorityTableInArgs(args);
grpc_core::UniquePtr<char> authority;
if (target_authority_table != nullptr) {
// Find the authority for the target.
- const char* target_uri_str =
- grpc_get_subchannel_address_uri_arg(args->args);
+ const char* target_uri_str = grpc_get_subchannel_address_uri_arg(args);
grpc_uri* target_uri =
grpc_uri_parse(target_uri_str, false /* suppress errors */);
GPR_ASSERT(target_uri != nullptr);
@@ -100,15 +99,14 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
}
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
- if (grpc_channel_args_find(args->args, GRPC_ARG_DEFAULT_AUTHORITY) ==
- nullptr) {
+ if (grpc_channel_args_find(args, GRPC_ARG_DEFAULT_AUTHORITY) == nullptr) {
// If the channel args don't already contain GRPC_ARG_DEFAULT_AUTHORITY, add
// the arg, setting it to the value just obtained.
args_to_add[num_args_to_add++] = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), authority.get());
}
grpc_channel_args* args_with_authority =
- grpc_channel_args_copy_and_add(args->args, args_to_add, num_args_to_add);
+ grpc_channel_args_copy_and_add(args, args_to_add, num_args_to_add);
grpc_uri_destroy(server_uri);
// Create the security connector using the credentials and target name.
grpc_channel_args* new_args_from_connector = nullptr;
@@ -137,29 +135,21 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
grpc_channel_args_destroy(new_args_from_connector);
}
grpc_channel_args_destroy(args_with_authority);
- grpc_subchannel_args* final_sc_args =
- static_cast<grpc_subchannel_args*>(gpr_malloc(sizeof(*final_sc_args)));
- memcpy(final_sc_args, args, sizeof(*args));
- final_sc_args->args = new_args;
- return final_sc_args;
+ return new_args;
}
static grpc_subchannel* client_channel_factory_create_subchannel(
- grpc_client_channel_factory* cc_factory, const grpc_subchannel_args* args) {
- grpc_subchannel_args* subchannel_args =
- get_secure_naming_subchannel_args(args);
- if (subchannel_args == nullptr) {
- gpr_log(
- GPR_ERROR,
- "Failed to create subchannel arguments during subchannel creation.");
+ grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) {
+ grpc_channel_args* new_args = get_secure_naming_channel_args(args);
+ if (new_args == nullptr) {
+ gpr_log(GPR_ERROR,
+ "Failed to create channel args during subchannel creation.");
return nullptr;
}
grpc_connector* connector = grpc_chttp2_connector_create();
- grpc_subchannel* s = grpc_subchannel_create(connector, subchannel_args);
+ grpc_subchannel* s = grpc_subchannel_create(connector, new_args);
grpc_connector_unref(connector);
- grpc_channel_args_destroy(
- const_cast<grpc_channel_args*>(subchannel_args->args));
- gpr_free(subchannel_args);
+ grpc_channel_args_destroy(new_args);
return s;
}
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc
index 40a30e4a31..6e08d27b21 100644
--- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc
@@ -46,9 +46,21 @@ GRPCAPI grpc_channel* grpc_cronet_secure_channel_create(
"grpc_create_cronet_transport: stream_engine = %p, target=%s", engine,
target);
+ // Disable client authority filter when using Cronet
+ grpc_arg disable_client_authority_filter_arg;
+ disable_client_authority_filter_arg.key =
+ const_cast<char*>(GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER);
+ disable_client_authority_filter_arg.type = GRPC_ARG_INTEGER;
+ disable_client_authority_filter_arg.value.integer = 1;
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
+ args, &disable_client_authority_filter_arg, 1);
+
grpc_transport* ct =
- grpc_create_cronet_transport(engine, target, args, reserved);
+ grpc_create_cronet_transport(engine, target, new_args, reserved);
grpc_core::ExecCtx exec_ctx;
- return grpc_channel_create(target, args, GRPC_CLIENT_DIRECT_CHANNEL, ct);
+ grpc_channel* channel =
+ grpc_channel_create(target, new_args, GRPC_CLIENT_DIRECT_CHANNEL, ct);
+ grpc_channel_args_destroy(new_args);
+ return channel;
}
diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc
index 827fd24831..6cb9ca1a0d 100644
--- a/src/core/lib/iomgr/cfstream_handle.cc
+++ b/src/core/lib/iomgr/cfstream_handle.cc
@@ -52,62 +52,52 @@ CFStreamHandle* CFStreamHandle::CreateStreamHandle(
void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
CFStreamEventType type,
void* client_callback_info) {
+ grpc_core::ExecCtx exec_ctx;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
- CFSTREAM_HANDLE_REF(handle, "read callback");
- dispatch_async(
- dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
- grpc_core::ExecCtx exec_ctx;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
- stream, type, client_callback_info);
- }
- switch (type) {
- case kCFStreamEventOpenCompleted:
- handle->open_event_.SetReady();
- break;
- case kCFStreamEventHasBytesAvailable:
- case kCFStreamEventEndEncountered:
- handle->read_event_.SetReady();
- break;
- case kCFStreamEventErrorOccurred:
- handle->open_event_.SetReady();
- handle->read_event_.SetReady();
- break;
- default:
- GPR_UNREACHABLE_CODE(return );
- }
- CFSTREAM_HANDLE_UNREF(handle, "read callback");
- });
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
+ stream, type, client_callback_info);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ handle->open_event_.SetReady();
+ break;
+ case kCFStreamEventHasBytesAvailable:
+ case kCFStreamEventEndEncountered:
+ handle->read_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ handle->open_event_.SetReady();
+ handle->read_event_.SetReady();
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
}
void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
CFStreamEventType type,
void* clientCallBackInfo) {
+ grpc_core::ExecCtx exec_ctx;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
- CFSTREAM_HANDLE_REF(handle, "write callback");
- dispatch_async(
- dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
- grpc_core::ExecCtx exec_ctx;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
- stream, type, clientCallBackInfo);
- }
- switch (type) {
- case kCFStreamEventOpenCompleted:
- handle->open_event_.SetReady();
- break;
- case kCFStreamEventCanAcceptBytes:
- case kCFStreamEventEndEncountered:
- handle->write_event_.SetReady();
- break;
- case kCFStreamEventErrorOccurred:
- handle->open_event_.SetReady();
- handle->write_event_.SetReady();
- break;
- default:
- GPR_UNREACHABLE_CODE(return );
- }
- CFSTREAM_HANDLE_UNREF(handle, "write callback");
- });
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
+ stream, type, clientCallBackInfo);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ handle->open_event_.SetReady();
+ break;
+ case kCFStreamEventCanAcceptBytes:
+ case kCFStreamEventEndEncountered:
+ handle->write_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ handle->open_event_.SetReady();
+ handle->write_event_.SetReady();
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
}
CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
@@ -116,6 +106,7 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
open_event_.InitEvent();
read_event_.InitEvent();
write_event_.InitEvent();
+ dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
CFStreamClientContext ctx = {0, static_cast<void*>(this),
CFStreamHandle::Retain, CFStreamHandle::Release,
nil};
@@ -129,10 +120,8 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
CFStreamHandle::WriteCallback, &ctx);
- CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
- kCFRunLoopCommonModes);
- CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
- kCFRunLoopCommonModes);
+ CFReadStreamSetDispatchQueue(read_stream, dispatch_queue_);
+ CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue_);
}
CFStreamHandle::~CFStreamHandle() {
diff --git a/src/core/lib/iomgr/cfstream_handle.h b/src/core/lib/iomgr/cfstream_handle.h
index 4258e72431..93ec5f044b 100644
--- a/src/core/lib/iomgr/cfstream_handle.h
+++ b/src/core/lib/iomgr/cfstream_handle.h
@@ -62,6 +62,8 @@ class CFStreamHandle final {
grpc_core::LockfreeEvent read_event_;
grpc_core::LockfreeEvent write_event_;
+ dispatch_queue_t dispatch_queue_;
+
gpr_refcount refcount_;
};
diff --git a/src/core/lib/iomgr/grpc_if_nametoindex.h b/src/core/lib/iomgr/grpc_if_nametoindex.h
new file mode 100644
index 0000000000..ed9612dcb9
--- /dev/null
+++ b/src/core/lib/iomgr/grpc_if_nametoindex.h
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_GRPC_IF_NAMETOINDEX_H
+#define GRPC_CORE_LIB_IOMGR_GRPC_IF_NAMETOINDEX_H
+
+#include <grpc/support/port_platform.h>
+
+#include <stddef.h>
+
+/* Returns the interface index corresponding to the interface "name" provided.
+ * Returns non-zero upon success, and zero upon failure. */
+uint32_t grpc_if_nametoindex(char* name);
+
+#endif /* GRPC_CORE_LIB_IOMGR_GRPC_IF_NAMETOINDEX_H */
diff --git a/src/core/lib/iomgr/grpc_if_nametoindex_posix.cc b/src/core/lib/iomgr/grpc_if_nametoindex_posix.cc
new file mode 100644
index 0000000000..f1ba20dcec
--- /dev/null
+++ b/src/core/lib/iomgr/grpc_if_nametoindex_posix.cc
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#if GRPC_IF_NAMETOINDEX == 1 && defined(GRPC_POSIX_SOCKET_IF_NAMETOINDEX)
+
+#include "src/core/lib/iomgr/grpc_if_nametoindex.h"
+
+#include <errno.h>
+#include <net/if.h>
+
+#include <grpc/support/log.h>
+
+uint32_t grpc_if_nametoindex(char* name) {
+ uint32_t out = if_nametoindex(name);
+ if (out == 0) {
+ gpr_log(GPR_DEBUG, "if_nametoindex failed for name %s. errno %d", name,
+ errno);
+ }
+ return out;
+}
+
+#endif /* GRPC_IF_NAMETOINDEX == 1 && \
+ defined(GRPC_POSIX_SOCKET_IF_NAMETOINDEX) */
diff --git a/src/core/lib/iomgr/grpc_if_nametoindex_unsupported.cc b/src/core/lib/iomgr/grpc_if_nametoindex_unsupported.cc
new file mode 100644
index 0000000000..08644cccf3
--- /dev/null
+++ b/src/core/lib/iomgr/grpc_if_nametoindex_unsupported.cc
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#if GRPC_IF_NAMETOINDEX == 0 || !defined(GRPC_POSIX_SOCKET_IF_NAMETOINDEX)
+
+#include "src/core/lib/iomgr/grpc_if_nametoindex.h"
+
+#include <grpc/support/log.h>
+
+uint32_t grpc_if_nametoindex(char* name) {
+ gpr_log(GPR_DEBUG,
+ "Not attempting to convert interface name %s to index for current "
+ "platform.",
+ name);
+ return 0;
+}
+
+#endif /* GRPC_IF_NAMETOINDEX == 0 || \
+ !defined(GRPC_POSIX_SOCKET_IF_NAMETOINDEX) */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index c8046b21dc..7b6ca1bc0e 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -184,6 +184,7 @@
#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1
#define GRPC_POSIX_SOCKET_EV_POLL 1
#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
+#define GRPC_POSIX_SOCKET_IF_NAMETOINDEX 1
#define GRPC_POSIX_SOCKET_IOMGR 1
#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1
#define GRPC_POSIX_SOCKET_SOCKADDR 1
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc
index 7e4b3c9b2f..61c366098e 100644
--- a/src/core/lib/iomgr/resource_quota.cc
+++ b/src/core/lib/iomgr/resource_quota.cc
@@ -665,6 +665,7 @@ void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
GPR_ASSERT(resource_quota->num_threads_allocated == 0);
GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
gpr_free(resource_quota->name);
+ gpr_mu_destroy(&resource_quota->thread_count_mu);
gpr_free(resource_quota);
}
}
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 4b5250803d..86ee1010cf 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -42,6 +42,7 @@
#include "src/core/lib/iomgr/tcp_windows.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#if defined(__MSYS__) && defined(GPR_ARCH_64)
/* Nasty workaround for nasty bug when using the 64 bits msys compiler
@@ -112,7 +113,10 @@ typedef struct grpc_tcp {
grpc_closure* read_cb;
grpc_closure* write_cb;
- grpc_slice read_slice;
+
+ /* garbage after the last read */
+ grpc_slice_buffer last_read_buffer;
+
grpc_slice_buffer* write_slices;
grpc_slice_buffer* read_slices;
@@ -131,6 +135,7 @@ static void tcp_free(grpc_tcp* tcp) {
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
+ grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
gpr_free(tcp);
@@ -179,9 +184,12 @@ static void on_read(void* tcpp, grpc_error* error) {
grpc_tcp* tcp = (grpc_tcp*)tcpp;
grpc_closure* cb = tcp->read_cb;
grpc_winsocket* socket = tcp->socket;
- grpc_slice sub;
grpc_winsocket_callback_info* info = &socket->read_info;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "TCP:%p on_read", tcp);
+ }
+
GRPC_ERROR_REF(error);
if (error == GRPC_ERROR_NONE) {
@@ -189,13 +197,35 @@ static void on_read(void* tcpp, grpc_error* error) {
char* utf8_message = gpr_format_message(info->wsa_error);
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
gpr_free(utf8_message);
- grpc_slice_unref_internal(tcp->read_slice);
+ grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
} else {
if (info->bytes_transfered != 0 && !tcp->shutting_down) {
- sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
- grpc_slice_buffer_add(tcp->read_slices, sub);
+ GPR_ASSERT((size_t)info->bytes_transfered <= tcp->read_slices->length);
+ if (static_cast<size_t>(info->bytes_transfered) !=
+ tcp->read_slices->length) {
+ grpc_slice_buffer_trim_end(
+ tcp->read_slices,
+ tcp->read_slices->length -
+ static_cast<size_t>(info->bytes_transfered),
+ &tcp->last_read_buffer);
+ }
+ GPR_ASSERT((size_t)info->bytes_transfered == tcp->read_slices->length);
+
+ if (grpc_tcp_trace.enabled()) {
+ size_t i;
+ for (i = 0; i < tcp->read_slices->count; i++) {
+ char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
+ dump);
+ gpr_free(dump);
+ }
+ }
} else {
- grpc_slice_unref_internal(tcp->read_slice);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "TCP:%p unref read_slice", tcp);
+ }
+ grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
error = tcp->shutting_down
? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP stream shutting down", &tcp->shutdown_error, 1)
@@ -209,6 +239,8 @@ static void on_read(void* tcpp, grpc_error* error) {
GRPC_CLOSURE_SCHED(cb, error);
}
+#define DEFAULT_TARGET_READ_SIZE 8192
+#define MAX_WSABUF_COUNT 16
static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
grpc_closure* cb) {
grpc_tcp* tcp = (grpc_tcp*)ep;
@@ -217,7 +249,12 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
int status;
DWORD bytes_read = 0;
DWORD flags = 0;
- WSABUF buffer;
+ WSABUF buffers[MAX_WSABUF_COUNT];
+ size_t i;
+
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "TCP:%p win_read", tcp);
+ }
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
@@ -229,18 +266,27 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
tcp->read_cb = cb;
tcp->read_slices = read_slices;
grpc_slice_buffer_reset_and_unref_internal(read_slices);
+ grpc_slice_buffer_swap(read_slices, &tcp->last_read_buffer);
- tcp->read_slice = GRPC_SLICE_MALLOC(8192);
+ if (tcp->read_slices->length < DEFAULT_TARGET_READ_SIZE / 2 &&
+ tcp->read_slices->count < MAX_WSABUF_COUNT) {
+ // TODO(jtattermusch): slice should be allocated using resource quota
+ grpc_slice_buffer_add(tcp->read_slices,
+ GRPC_SLICE_MALLOC(DEFAULT_TARGET_READ_SIZE));
+ }
- buffer.len = (ULONG)GRPC_SLICE_LENGTH(
- tcp->read_slice); // we know slice size fits in 32bit.
- buffer.buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slice);
+ GPR_ASSERT(tcp->read_slices->count <= MAX_WSABUF_COUNT);
+ for (i = 0; i < tcp->read_slices->count; i++) {
+ buffers[i].len = (ULONG)GRPC_SLICE_LENGTH(
+ tcp->read_slices->slices[i]); // we know slice size fits in 32bit.
+ buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[i]);
+ }
TCP_REF(tcp, "read");
/* First let's try a synchronous, non-blocking read. */
- status =
- WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL);
+ status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
+ &bytes_read, &flags, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError();
/* Did we get data immediately ? Yay. */
@@ -252,8 +298,8 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
/* Otherwise, let's retry, by queuing a read. */
memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
- status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
- &info->overlapped, NULL);
+ status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
+ &bytes_read, &flags, &info->overlapped, NULL);
if (status != 0) {
int wsa_error = WSAGetLastError();
@@ -275,6 +321,10 @@ static void on_write(void* tcpp, grpc_error* error) {
grpc_winsocket_callback_info* info = &handle->write_info;
grpc_closure* cb;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "TCP:%p on_write", tcp);
+ }
+
GRPC_ERROR_REF(error);
gpr_mu_lock(&tcp->mu);
@@ -303,11 +353,21 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
unsigned i;
DWORD bytes_sent;
int status;
- WSABUF local_buffers[16];
+ WSABUF local_buffers[MAX_WSABUF_COUNT];
WSABUF* allocated = NULL;
WSABUF* buffers = local_buffers;
size_t len;
+ if (grpc_tcp_trace.enabled()) {
+ size_t i;
+ for (i = 0; i < slices->count; i++) {
+ char* data =
+ grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
+ gpr_free(data);
+ }
+ }
+
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
@@ -412,6 +472,7 @@ static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
static void win_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = (grpc_tcp*)ep;
+ grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
TCP_UNREF(tcp, "destroy");
}
@@ -463,6 +524,7 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
tcp->peer_string = gpr_strdup(peer_string);
+ grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index f01852473b..d17a721606 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -45,6 +45,11 @@ void grpc_channel_init_init(void);
/// registration order (in the case of a tie).
/// Stages are registered against one of the pre-determined channel stack
/// types.
+/// If the channel stack type is GRPC_CLIENT_SUBCHANNEL, the caller should
+/// ensure that subchannels with different filter lists will always have
+/// different channel args. This requires setting a channel arg in case the
+/// registration function relies on some condition other than channel args to
+/// decide whether to add a filter or not.
void grpc_channel_init_register_stage(grpc_channel_stack_type type,
int priority,
grpc_channel_init_stage stage_fn,
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 67b38e6f0c..7ae6e51a5f 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -194,13 +194,10 @@ struct call_data {
};
struct request_matcher {
- request_matcher(grpc_server* server);
- ~request_matcher();
-
grpc_server* server;
- std::atomic<call_data*> pending_head{nullptr};
- call_data* pending_tail = nullptr;
- gpr_locked_mpscq* requests_per_cq = nullptr;
+ call_data* pending_head;
+ call_data* pending_tail;
+ gpr_locked_mpscq* requests_per_cq;
};
struct registered_method {
@@ -349,30 +346,22 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb,
* request_matcher
*/
-namespace {
-request_matcher::request_matcher(grpc_server* server) : server(server) {
- requests_per_cq = static_cast<gpr_locked_mpscq*>(
- gpr_malloc(sizeof(*requests_per_cq) * server->cq_count));
- for (size_t i = 0; i < server->cq_count; i++) {
- gpr_locked_mpscq_init(&requests_per_cq[i]);
- }
-}
-
-request_matcher::~request_matcher() {
+static void request_matcher_init(request_matcher* rm, grpc_server* server) {
+ memset(rm, 0, sizeof(*rm));
+ rm->server = server;
+ rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
+ gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
for (size_t i = 0; i < server->cq_count; i++) {
- GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr);
- gpr_locked_mpscq_destroy(&requests_per_cq[i]);
+ gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
}
- gpr_free(requests_per_cq);
-}
-} // namespace
-
-static void request_matcher_init(request_matcher* rm, grpc_server* server) {
- new (rm) request_matcher(server);
}
static void request_matcher_destroy(request_matcher* rm) {
- rm->~request_matcher();
+ for (size_t i = 0; i < rm->server->cq_count; i++) {
+ GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
+ gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
+ }
+ gpr_free(rm->requests_per_cq);
}
static void kill_zombie(void* elem, grpc_error* error) {
@@ -381,10 +370,9 @@ static void kill_zombie(void* elem, grpc_error* error) {
}
static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
- call_data* calld;
- while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
- nullptr) {
- rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
+ while (rm->pending_head) {
+ call_data* calld = rm->pending_head;
+ rm->pending_head = calld->pending_next;
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
@@ -582,9 +570,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
}
gpr_atm_no_barrier_store(&calld->state, PENDING);
- if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
- rm->pending_head.store(calld, std::memory_order_relaxed);
- rm->pending_tail = calld;
+ if (rm->pending_head == nullptr) {
+ rm->pending_tail = rm->pending_head = calld;
} else {
rm->pending_tail->pending_next = calld;
rm->pending_tail = calld;
@@ -1448,39 +1435,30 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
rm = &rc->data.registered.method->matcher;
break;
}
-
- // Fast path: if there is no pending request to be processed, immediately
- // return.
- if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) ||
- // Note: We are reading the pending_head without holding the server's call
- // mutex. Even if we read a non-null value here due to reordering,
- // we will check it below again after grabbing the lock.
- rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
- return GRPC_CALL_OK;
- }
- // Slow path: This was the first queued request and there are pendings:
- // We need to lock and start matching calls.
- gpr_mu_lock(&server->mu_call);
- while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
- nullptr) {
- rc = reinterpret_cast<requested_call*>(
- gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
- if (rc == nullptr) break;
- rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
- gpr_mu_unlock(&server->mu_call);
- if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
- // Zombied Call
- GRPC_CLOSURE_INIT(
- &calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
- } else {
- publish_call(server, calld, cq_idx, rc);
- }
+ if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
+ /* this was the first queued request: we need to lock and start
+ matching calls */
gpr_mu_lock(&server->mu_call);
+ while ((calld = rm->pending_head) != nullptr) {
+ rc = reinterpret_cast<requested_call*>(
+ gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
+ if (rc == nullptr) break;
+ rm->pending_head = calld->pending_next;
+ gpr_mu_unlock(&server->mu_call);
+ if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
+ // Zombied Call
+ GRPC_CLOSURE_INIT(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ } else {
+ publish_call(server, calld, cq_idx, rc);
+ }
+ gpr_mu_lock(&server->mu_call);
+ }
+ gpr_mu_unlock(&server->mu_call);
}
- gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc
index 4829cc80a5..70d7580bec 100644
--- a/src/core/lib/surface/version.cc
+++ b/src/core/lib/surface/version.cc
@@ -25,4 +25,4 @@
const char* grpc_version_string(void) { return "7.0.0-dev"; }
-const char* grpc_g_stands_for(void) { return "goose"; }
+const char* grpc_g_stands_for(void) { return "gold"; }
diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc
index 60af22393e..30482a1b3b 100644
--- a/src/core/lib/transport/metadata.cc
+++ b/src/core/lib/transport/metadata.cc
@@ -187,6 +187,7 @@ static void gc_mdtab(mdtab_shard* shard) {
((destroy_user_data_func)gpr_atm_no_barrier_load(
&md->destroy_user_data))(user_data);
}
+ gpr_mu_destroy(&md->mu_user_data);
gpr_free(md);
*prev_next = next;
num_freed++;