diff options
Diffstat (limited to 'src/core')
26 files changed, 340 insertions, 224 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/census/grpc_filter.c index d996c3475e..fbedb35661 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/census/grpc_filter.c @@ -31,11 +31,13 @@ * */ -#include "src/core/channel/census_filter.h" +#include "src/core/census/grpc_filter.h" #include <stdio.h> #include <string.h> +#include "include/grpc/census.h" +#include "src/core/census/rpc_stat_id.h" #include "src/core/channel/channel_stack.h" #include "src/core/channel/noop_filter.h" #include "src/core/statistics/census_interface.h" @@ -47,24 +49,19 @@ typedef struct call_data { census_op_id op_id; - census_rpc_stats stats; + census_context* ctxt; gpr_timespec start_ts; + int error; /* recv callback */ grpc_stream_op_buffer* recv_ops; - void (*on_done_recv)(void* user_data, int success); - void* recv_user_data; + grpc_iomgr_closure* on_done_recv; } call_data; typedef struct channel_data { grpc_mdstr* path_str; /* pointer to meta data str with key == ":path" */ } channel_data; -static void init_rpc_stats(census_rpc_stats* stats) { - memset(stats, 0, sizeof(census_rpc_stats)); - stats->cnt = 1; -} - static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_data* calld, channel_data* chand) { @@ -77,8 +74,7 @@ static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, if (m->md->key == chand->path_str) { gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); - census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR( - m->md->value->slice)); + /* Add method tag here */ } } } @@ -95,8 +91,6 @@ static void client_mutate_op(grpc_call_element* elem, static void client_start_transport_op(grpc_call_element* elem, grpc_transport_stream_op* op) { - call_data* calld = elem->call_data; - GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); client_mutate_op(elem, op); grpc_call_next_op(elem, op); } @@ -108,7 +102,7 @@ static void server_on_done_recv(void* ptr, int success) { if (success) { extract_and_annotate_method_tag(calld->recv_ops, calld, chand); } - calld->on_done_recv(calld->recv_user_data, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } static void server_mutate_op(grpc_call_element* elem, @@ -118,9 +112,7 @@ static void server_mutate_op(grpc_call_element* elem, /* substitute our callback for the op callback */ calld->recv_ops = op->recv_ops; calld->on_done_recv = op->on_done_recv; - calld->recv_user_data = op->recv_user_data; - op->on_done_recv = server_on_done_recv; - op->recv_user_data = elem; + op->on_done_recv = calld->on_done_recv; } } @@ -132,35 +124,19 @@ static void server_start_transport_op(grpc_call_element* elem, grpc_call_next_op(elem, op); } -static void channel_op(grpc_channel_element* elem, - grpc_channel_element* from_elem, grpc_channel_op* op) { - switch (op->type) { - case GRPC_TRANSPORT_CLOSED: - /* TODO(hongyu): Annotate trace information for all calls of the channel - */ - break; - default: - break; - } - grpc_channel_next_op(elem, op); -} - static void client_init_call_elem(grpc_call_element* elem, const void* server_transport_data, grpc_transport_stream_op* initial_op) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); - init_rpc_stats(&d->stats); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); - d->op_id = census_tracing_start_op(); if (initial_op) client_mutate_op(elem, initial_op); } static void client_destroy_call_elem(grpc_call_element* elem) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); - census_record_rpc_client_stats(d->op_id, &d->stats); - census_tracing_end_op(d->op_id); + /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ } static void server_init_call_elem(grpc_call_element* elem, @@ -168,29 +144,24 @@ static void server_init_call_elem(grpc_call_element* elem, grpc_transport_stream_op* initial_op) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); - init_rpc_stats(&d->stats); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); - d->op_id = census_tracing_start_op(); + /* TODO(hongyu): call census_tracing_start_op here. */ + grpc_iomgr_closure_init(d->on_done_recv, server_on_done_recv, elem); if (initial_op) server_mutate_op(elem, initial_op); } static void server_destroy_call_elem(grpc_call_element* elem) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); - d->stats.elapsed_time_ms = gpr_timespec_to_micros( - gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), d->start_ts)); - census_record_rpc_server_stats(d->op_id, &d->stats); - census_tracing_end_op(d->op_id); + /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ } -static void init_channel_elem(grpc_channel_element* elem, +static void init_channel_elem(grpc_channel_element* elem, grpc_channel* master, const grpc_channel_args* args, grpc_mdctx* mdctx, int is_first, int is_last) { channel_data* chand = elem->channel_data; GPR_ASSERT(chand != NULL); - GPR_ASSERT(!is_first); - GPR_ASSERT(!is_last); - chand->path_str = grpc_mdstr_from_string(mdctx, ":path"); + chand->path_str = grpc_mdstr_from_string(mdctx, ":path", 0); } static void destroy_channel_elem(grpc_channel_element* elem) { @@ -203,22 +174,24 @@ static void destroy_channel_elem(grpc_channel_element* elem) { const grpc_channel_filter grpc_client_census_filter = { client_start_transport_op, - channel_op, + grpc_channel_next_op, sizeof(call_data), client_init_call_elem, client_destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { server_start_transport_op, - channel_op, + grpc_channel_next_op, sizeof(call_data), server_init_call_elem, server_destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "census-server"}; diff --git a/src/core/channel/census_filter.h b/src/core/census/grpc_filter.h index 1453c05d28..1453c05d28 100644 --- a/src/core/channel/census_filter.h +++ b/src/core/census/grpc_filter.h diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h index 0917e81ca4..415459bca6 100644 --- a/src/core/channel/compress_filter.h +++ b/src/core/channel/compress_filter.h @@ -36,7 +36,7 @@ #include "src/core/channel/channel_stack.h" -#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "internal:grpc-encoding-request" +#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request" /** Compression filter for outgoing data. * diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 48c623d359..2b61d33c29 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -45,7 +45,6 @@ typedef struct call_data { grpc_linked_mdelem content_type; grpc_linked_mdelem user_agent; int sent_initial_metadata; - int sent_authority; int got_initial_metadata; grpc_stream_op_buffer *recv_ops; @@ -64,7 +63,6 @@ typedef struct channel_data { grpc_mdelem *scheme; grpc_mdelem *content_type; grpc_mdelem *status; - grpc_mdelem *default_authority; /** complete user agent mdelem */ grpc_mdelem *user_agent; } channel_data; @@ -103,7 +101,6 @@ static void hc_on_recv(void *user_data, int success) { static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; /* eat the things we'd like to set ourselves */ if (md->key == channeld->method->key) return NULL; @@ -111,10 +108,6 @@ static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { if (md->key == channeld->te_trailers->key) return NULL; if (md->key == channeld->content_type->key) return NULL; if (md->key == channeld->user_agent->key) return NULL; - if (channeld->default_authority && - channeld->default_authority->key == md->key) { - calld->sent_authority = 1; - } return md; } @@ -138,11 +131,6 @@ static void hc_mutate_op(grpc_call_element *elem, GRPC_MDELEM_REF(channeld->method)); grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, GRPC_MDELEM_REF(channeld->scheme)); - if (channeld->default_authority && !calld->sent_authority) { - grpc_metadata_batch_add_head( - &op->data.metadata, &calld->authority, - GRPC_MDELEM_REF(channeld->default_authority)); - } grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, GRPC_MDELEM_REF(channeld->te_trailers)); grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, @@ -175,7 +163,6 @@ static void init_call_elem(grpc_call_element *elem, call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; - calld->sent_authority = 0; calld->on_done_recv = NULL; grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem); if (initial_op) hc_mutate_op(elem, initial_op); @@ -257,8 +244,6 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *channel_args, grpc_mdctx *mdctx, int is_first, int is_last) { - size_t i; - /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; @@ -267,21 +252,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, path */ GPR_ASSERT(!is_last); - channeld->default_authority = NULL; - if (channel_args) { - for (i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { - if (channel_args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "%s: must be an string", - GRPC_ARG_DEFAULT_AUTHORITY); - } else { - channeld->default_authority = grpc_mdelem_from_strings( - mdctx, ":authority", channel_args->args[i].value.string); - } - } - } - } - /* initialize members */ channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST"); @@ -306,9 +276,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { GRPC_MDELEM_UNREF(channeld->content_type); GRPC_MDELEM_UNREF(channeld->status); GRPC_MDELEM_UNREF(channeld->user_agent); - if (channeld->default_authority) { - GRPC_MDELEM_UNREF(channeld->default_authority); - } } const grpc_channel_filter grpc_http_client_filter = { diff --git a/src/core/client_config/resolver_factory.c b/src/core/client_config/resolver_factory.c index 6721977e21..5b859a8d10 100644 --- a/src/core/client_config/resolver_factory.c +++ b/src/core/client_config/resolver_factory.c @@ -45,6 +45,12 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *factory) { grpc_resolver *grpc_resolver_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - if (!factory) return NULL; + if (factory == NULL) return NULL; return factory->vtable->create_resolver(factory, uri, subchannel_factory); } + +char *grpc_resolver_factory_get_default_authority( + grpc_resolver_factory *factory, grpc_uri *uri) { + if (factory == NULL) return NULL; + return factory->vtable->get_default_authority(factory, uri); +} diff --git a/src/core/client_config/resolver_factory.h b/src/core/client_config/resolver_factory.h index c5d85499c6..e243b23988 100644 --- a/src/core/client_config/resolver_factory.h +++ b/src/core/client_config/resolver_factory.h @@ -51,9 +51,16 @@ struct grpc_resolver_factory_vtable { void (*ref)(grpc_resolver_factory *factory); void (*unref)(grpc_resolver_factory *factory); + /** Implementation of grpc_resolver_factory_create_resolver */ grpc_resolver *(*create_resolver)( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory); + + /** Implementation of grpc_resolver_factory_get_default_authority */ + char *(*get_default_authority)(grpc_resolver_factory *factory, grpc_uri *uri); + + /** URI scheme that this factory implements */ + const char *scheme; }; void grpc_resolver_factory_ref(grpc_resolver_factory *resolver); @@ -64,4 +71,9 @@ grpc_resolver *grpc_resolver_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory); +/** Return a (freshly allocated with gpr_malloc) string representing + the default authority to use for this scheme. */ +char *grpc_resolver_factory_get_default_authority( + grpc_resolver_factory *factory, grpc_uri *uri); + #endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_FACTORY_H */ diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c index 16be2da994..37979b3b86 100644 --- a/src/core/client_config/resolver_registry.c +++ b/src/core/client_config/resolver_registry.c @@ -41,41 +41,33 @@ #define MAX_RESOLVERS 10 -typedef struct { - char *scheme; - grpc_resolver_factory *factory; -} registered_resolver; - -static registered_resolver g_all_of_the_resolvers[MAX_RESOLVERS]; +static grpc_resolver_factory *g_all_of_the_resolvers[MAX_RESOLVERS]; static int g_number_of_resolvers = 0; -static char *g_default_resolver_scheme; +static char *g_default_resolver_prefix; -void grpc_resolver_registry_init(const char *default_resolver_scheme) { +void grpc_resolver_registry_init(const char *default_resolver_prefix) { g_number_of_resolvers = 0; - g_default_resolver_scheme = gpr_strdup(default_resolver_scheme); + g_default_resolver_prefix = gpr_strdup(default_resolver_prefix); } void grpc_resolver_registry_shutdown(void) { int i; for (i = 0; i < g_number_of_resolvers; i++) { - gpr_free(g_all_of_the_resolvers[i].scheme); - grpc_resolver_factory_unref(g_all_of_the_resolvers[i].factory); + grpc_resolver_factory_unref(g_all_of_the_resolvers[i]); } - gpr_free(g_default_resolver_scheme); + gpr_free(g_default_resolver_prefix); } -void grpc_register_resolver_type(const char *scheme, - grpc_resolver_factory *factory) { +void grpc_register_resolver_type(grpc_resolver_factory *factory) { int i; for (i = 0; i < g_number_of_resolvers; i++) { - GPR_ASSERT(0 != strcmp(scheme, g_all_of_the_resolvers[i].scheme)); + GPR_ASSERT(0 != strcmp(factory->vtable->scheme, + g_all_of_the_resolvers[i]->vtable->scheme)); } GPR_ASSERT(g_number_of_resolvers != MAX_RESOLVERS); - g_all_of_the_resolvers[g_number_of_resolvers].scheme = gpr_strdup(scheme); grpc_resolver_factory_ref(factory); - g_all_of_the_resolvers[g_number_of_resolvers].factory = factory; - g_number_of_resolvers++; + g_all_of_the_resolvers[g_number_of_resolvers++] = factory; } static grpc_resolver_factory *lookup_factory(grpc_uri *uri) { @@ -85,40 +77,57 @@ static grpc_resolver_factory *lookup_factory(grpc_uri *uri) { if (!uri) return NULL; for (i = 0; i < g_number_of_resolvers; i++) { - if (0 == strcmp(uri->scheme, g_all_of_the_resolvers[i].scheme)) { - return g_all_of_the_resolvers[i].factory; + if (0 == strcmp(uri->scheme, g_all_of_the_resolvers[i]->vtable->scheme)) { + return g_all_of_the_resolvers[i]; } } return NULL; } -grpc_resolver *grpc_resolver_create( - const char *name, grpc_subchannel_factory *subchannel_factory) { - grpc_uri *uri; +static grpc_resolver_factory *resolve_factory(const char *target, + grpc_uri **uri) { char *tmp; grpc_resolver_factory *factory = NULL; - grpc_resolver *resolver; - - uri = grpc_uri_parse(name, 1); - factory = lookup_factory(uri); - if (factory == NULL && g_default_resolver_scheme != NULL) { - grpc_uri_destroy(uri); - gpr_asprintf(&tmp, "%s%s", g_default_resolver_scheme, name); - uri = grpc_uri_parse(tmp, 1); - factory = lookup_factory(uri); - if (factory == NULL) { - grpc_uri_destroy(grpc_uri_parse(name, 0)); - grpc_uri_destroy(grpc_uri_parse(tmp, 0)); - gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", name, tmp); + + GPR_ASSERT(uri != NULL); + *uri = grpc_uri_parse(target, 1); + factory = lookup_factory(*uri); + if (factory == NULL) { + if (g_default_resolver_prefix != NULL) { + grpc_uri_destroy(*uri); + gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target); + *uri = grpc_uri_parse(tmp, 1); + factory = lookup_factory(*uri); + if (factory == NULL) { + grpc_uri_destroy(grpc_uri_parse(target, 0)); + grpc_uri_destroy(grpc_uri_parse(tmp, 0)); + gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, + tmp); + } + gpr_free(tmp); + } else { + grpc_uri_destroy(grpc_uri_parse(target, 0)); + gpr_log(GPR_ERROR, "don't know how to resolve '%s'", target); } - gpr_free(tmp); - } else if (factory == NULL) { - grpc_uri_destroy(grpc_uri_parse(name, 0)); - gpr_log(GPR_ERROR, "don't know how to resolve '%s'", name); } - resolver = + return factory; +} + +grpc_resolver *grpc_resolver_create( + const char *target, grpc_subchannel_factory *subchannel_factory) { + grpc_uri *uri = NULL; + grpc_resolver_factory *factory = resolve_factory(target, &uri); + grpc_resolver *resolver = grpc_resolver_factory_create_resolver(factory, uri, subchannel_factory); grpc_uri_destroy(uri); return resolver; } + +char *grpc_get_default_authority(const char *target) { + grpc_uri *uri = NULL; + grpc_resolver_factory *factory = resolve_factory(target, &uri); + char *authority = grpc_resolver_factory_get_default_authority(factory, uri); + grpc_uri_destroy(uri); + return authority; +} diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h index 31aa47620a..5a7193b7ae 100644 --- a/src/core/client_config/resolver_registry.h +++ b/src/core/client_config/resolver_registry.h @@ -44,19 +44,22 @@ void grpc_resolver_registry_shutdown(void); If \a priority is greater than zero, then the resolver will be eligible to resolve names that are passed in with no scheme. Higher priority resolvers will be tried before lower priority schemes. */ -void grpc_register_resolver_type(const char *scheme, - grpc_resolver_factory *factory); +void grpc_register_resolver_type(grpc_resolver_factory *factory); -/** Create a resolver given \a name. - First tries to parse \a name as a URI. If this succeeds, tries +/** Create a resolver given \a target. + First tries to parse \a target as a URI. If this succeeds, tries to locate a registered resolver factory based on the URI scheme. If parsing or location fails, prefixes default_prefix from - grpc_resolver_registry_init to name, and tries again (if default_prefix + grpc_resolver_registry_init to target, and tries again (if default_prefix was not NULL). If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ grpc_resolver *grpc_resolver_create( - const char *name, grpc_subchannel_factory *subchannel_factory); + const char *target, grpc_subchannel_factory *subchannel_factory); + +/** Given a target, return a (freshly allocated with gpr_malloc) string + representing the default authority to pass from a client. */ +char *grpc_get_default_authority(const char *target); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVER_REGISTRY_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 7b35b7902f..84643c464a 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -203,9 +203,6 @@ static grpc_resolver *dns_create( grpc_subchannel_factory *subchannel_factory) { dns_resolver *r; const char *path = uri->path; - grpc_arg default_host_arg; - char *host; - char *port; if (0 != strcmp(uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported"); @@ -214,17 +211,6 @@ static grpc_resolver *dns_create( if (path[0] == '/') ++path; - gpr_split_host_port(path, &host, &port); - - default_host_arg.type = GRPC_ARG_STRING; - default_host_arg.key = GRPC_ARG_DEFAULT_AUTHORITY; - default_host_arg.value.string = host; - subchannel_factory = grpc_subchannel_factory_add_channel_arg( - subchannel_factory, &default_host_arg); - - gpr_free(host); - gpr_free(port); - r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); gpr_ref_init(&r->refs, 1); @@ -233,6 +219,7 @@ static grpc_resolver *dns_create( r->name = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->subchannel_factory = subchannel_factory; + grpc_subchannel_factory_ref(subchannel_factory); r->lb_policy_factory = lb_policy_factory; return &r->base; } @@ -252,8 +239,16 @@ static grpc_resolver *dns_factory_create_resolver( subchannel_factory); } +char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, + grpc_uri *uri) { + const char *path = uri->path; + if (path[0] == '/') ++path; + return gpr_strdup(path); +} + static const grpc_resolver_factory_vtable dns_factory_vtable = { - dns_factory_ref, dns_factory_unref, dns_factory_create_resolver}; + dns_factory_ref, dns_factory_unref, dns_factory_create_resolver, + dns_factory_get_default_host_name, "dns"}; static grpc_resolver_factory dns_resolver_factory = {&dns_factory_vtable}; grpc_resolver_factory *grpc_dns_resolver_factory_create() { diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 74584e7e2c..0d8540a566 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -60,9 +60,12 @@ typedef struct { grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, size_t num_subchannels); - /** the address that we've 'resolved' */ - struct sockaddr_storage addr; - int addr_len; + /** the addresses that we've 'resolved' */ + struct sockaddr_storage *addrs; + /** the corresponding length of the addresses */ + int *addrs_len; + /** how many elements in \a addrs */ + size_t num_addrs; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -119,17 +122,22 @@ static void sockaddr_next(grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; - grpc_subchannel *subchannel; + grpc_subchannel **subchannels; grpc_subchannel_args args; if (r->next_completion != NULL && !r->published) { + size_t i; cfg = grpc_client_config_create(); - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)&r->addr; - args.addr_len = r->addr_len; - subchannel = - grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args); - lb_policy = r->lb_policy_factory(&subchannel, 1); + subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs); + for (i = 0; i < r->num_addrs; i++) { + memset(&args, 0, sizeof(args)); + args.addr = (struct sockaddr *)&r->addrs[i]; + args.addr_len = r->addrs_len[i]; + subchannels[i] = grpc_subchannel_factory_create_subchannel( + r->subchannel_factory, &args); + } + lb_policy = r->lb_policy_factory(subchannels, r->num_addrs); + gpr_free(subchannels); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "unix"); r->published = 1; @@ -143,6 +151,8 @@ static void sockaddr_destroy(grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(r->subchannel_factory); + gpr_free(r->addrs); + gpr_free(r->addrs_len); gpr_free(r); } @@ -156,8 +166,29 @@ static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { return 1; } + +static char *unix_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return gpr_strdup("localhost"); +} #endif +static char *ip_get_default_authority(grpc_uri *uri) { + const char *path = uri->path; + if (path[0] == '/') ++path; + return gpr_strdup(path); +} + +static char *ipv4_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return ip_get_default_authority(uri); +} + +static char *ipv6_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return ip_get_default_authority(uri); +} + static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { const char *host_port = uri->path; char *host; @@ -238,13 +269,18 @@ done: return result; } +static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( grpc_uri *uri, grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, size_t num_subchannels), grpc_subchannel_factory *subchannel_factory, int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) { + size_t i; + int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; + gpr_slice path_slice; + gpr_slice_buffer path_parts; if (0 != strcmp(uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported"); @@ -253,7 +289,29 @@ static grpc_resolver *sockaddr_create( r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); - if (!parse(uri, &r->addr, &r->addr_len)) { + + path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing); + gpr_slice_buffer_init(&path_parts); + + gpr_slice_split(path_slice, ",", &path_parts); + r->num_addrs = path_parts.count; + r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); + r->addrs_len = gpr_malloc(sizeof(int) * r->num_addrs); + + for(i = 0; i < r->num_addrs; i++) { + grpc_uri ith_uri = *uri; + char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); + ith_uri.path = part_str; + if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { + errors_found = 1; /* GPR_TRUE */ + } + gpr_free(part_str); + if (errors_found) break; + } + + gpr_slice_buffer_destroy(&path_parts); + gpr_slice_unref(path_slice); + if (errors_found) { gpr_free(r); return NULL; } @@ -276,20 +334,20 @@ static void sockaddr_factory_ref(grpc_resolver_factory *factory) {} static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} -#define DECL_FACTORY(name) \ - static grpc_resolver *name##_factory_create_resolver( \ - grpc_resolver_factory *factory, grpc_uri *uri, \ - grpc_subchannel_factory *subchannel_factory) { \ - return sockaddr_create(uri, grpc_create_pick_first_lb_policy, \ - subchannel_factory, parse_##name); \ - } \ - static const grpc_resolver_factory_vtable name##_factory_vtable = { \ - sockaddr_factory_ref, sockaddr_factory_unref, \ - name##_factory_create_resolver}; \ - static grpc_resolver_factory name##_resolver_factory = { \ - &name##_factory_vtable}; \ - grpc_resolver_factory *grpc_##name##_resolver_factory_create() { \ - return &name##_resolver_factory; \ +#define DECL_FACTORY(name) \ + static grpc_resolver *name##_factory_create_resolver( \ + grpc_resolver_factory *factory, grpc_uri *uri, \ + grpc_subchannel_factory *subchannel_factory) { \ + return sockaddr_create(uri, grpc_create_pick_first_lb_policy, \ + subchannel_factory, parse_##name); \ + } \ + static const grpc_resolver_factory_vtable name##_factory_vtable = { \ + sockaddr_factory_ref, sockaddr_factory_unref, \ + name##_factory_create_resolver, name##_get_default_authority, #name}; \ + static grpc_resolver_factory name##_resolver_factory = { \ + &name##_factory_vtable}; \ + grpc_resolver_factory *grpc_##name##_resolver_factory_create() { \ + return &name##_resolver_factory; \ } #ifdef GPR_POSIX_SOCKET diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index acb2ba136e..da399f9954 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -467,8 +467,7 @@ static grpc_resolver *zookeeper_create( } static void zookeeper_plugin_init() { - grpc_register_resolver_type("zookeeper", - grpc_zookeeper_resolver_factory_create()); + grpc_register_resolver_type(grpc_zookeeper_resolver_factory_create()); } void grpc_zookeeper_register() { @@ -483,6 +482,11 @@ static void zookeeper_factory_ref(grpc_resolver_factory *factory) {} static void zookeeper_factory_unref(grpc_resolver_factory *factory) {} +static char *zookeeper_factory_get_default_hostname( + grpc_resolver_factory *factory, grpc_uri *uri) { + return NULL; +} + static grpc_resolver *zookeeper_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { @@ -492,7 +496,8 @@ static grpc_resolver *zookeeper_factory_create_resolver( static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { zookeeper_factory_ref, zookeeper_factory_unref, - zookeeper_factory_create_resolver}; + zookeeper_factory_create_resolver, zookeeper_factory_get_default_hostname, + "zookeeper"}; static grpc_resolver_factory zookeeper_resolver_factory = { &zookeeper_factory_vtable}; diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 469c382218..e3abe1bebc 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -144,6 +144,7 @@ static int on_read(grpc_tcp *tcp, int from_iocp) { gpr_free(utf8_message); } success = 0; + gpr_slice_unref(tcp->read_slice); } else { if (info->bytes_transfered != 0) { sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 16482c08f7..6429c38b28 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -235,8 +235,10 @@ static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, sizeof(get_local_ip)); if (rc == 0 && addr->sa_family == AF_INET6) { +#if !TARGET_OS_IPHONE rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, sizeof(get_local_ip)); +#endif } if (bind(fd, addr, addr_len) < 0) { diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c index ae37f584eb..4b6a0d2f56 100644 --- a/src/core/profiling/basic_timers.c +++ b/src/core/profiling/basic_timers.c @@ -36,7 +36,6 @@ #ifdef GRPC_BASIC_PROFILER #include "src/core/profiling/timers.h" -#include "src/core/profiling/timers_preciseclock.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -53,7 +52,7 @@ typedef enum { } marker_type; typedef struct grpc_timer_entry { - grpc_precise_clock tm; + gpr_timespec tm; int tag; const char* tagstr; marker_type type; @@ -71,9 +70,8 @@ static void log_report() { int i; for (i = 0; i < count; i++) { grpc_timer_entry* entry = &(log[i]); - printf("GRPC_LAT_PROF " GRPC_PRECISE_CLOCK_FORMAT - " %p %c %d(%s) %p %s %d\n", - GRPC_PRECISE_CLOCK_PRINTF_ARGS(&entry->tm), + printf("GRPC_LAT_PROF %ld.%09d %p %c %d(%s) %p %s %d\n", + entry->tm.tv_sec, entry->tm.tv_nsec, (void*)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tag, entry->tagstr, entry->id, entry->file, entry->line); } @@ -93,7 +91,7 @@ static void grpc_timers_log_add(int tag, const char* tagstr, marker_type type, entry = &log[count++]; - grpc_precise_clock_now(&entry->tm); + entry->tm = gpr_now(GPR_CLOCK_PRECISE); entry->tag = tag; entry->tagstr = tagstr; entry->type = type; diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index 841485c4b4..a274400243 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -32,6 +32,7 @@ */ #include <grpc/support/port_platform.h> +#include <src/core/support/time_precise.h> #ifdef GPR_POSIX_TIME @@ -66,8 +67,14 @@ void gpr_time_init(void) {} gpr_timespec gpr_now(gpr_clock_type clock) { struct timespec now; GPR_ASSERT(clock != GPR_TIMESPAN); - clock_gettime(clockid_for_gpr_clock[clock], &now); - return gpr_from_timespec(now, clock); + if (clock == GPR_CLOCK_PRECISE) { + gpr_timespec ret; + gpr_precise_clock_now(&ret); + return ret; + } else { + clock_gettime(clockid_for_gpr_clock[clock], &now); + return gpr_from_timespec(now, clock); + } } #else /* For some reason Apple's OSes haven't implemented clock_gettime. */ @@ -104,6 +111,9 @@ gpr_timespec gpr_now(gpr_clock_type clock) { now.tv_sec = now_dbl * 1e-9; now.tv_nsec = now_dbl - now.tv_sec * 1e9; break; + case GPR_CLOCK_PRECISE: + gpr_precise_clock_now(&now); + break; case GPR_TIMESPAN: abort(); } diff --git a/src/core/profiling/timers_preciseclock.h b/src/core/support/time_precise.h index 5c251b47e6..574ebb8448 100644 --- a/src/core/profiling/timers_preciseclock.h +++ b/src/core/support/time_precise.h @@ -31,65 +31,63 @@ * */ -#ifndef GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H -#define GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H +#ifndef GRPC_CORE_SUPPORT_TIME_PRECISE_H_ +#define GRPC_CORE_SUPPORT_TIME_PRECISE_H_ #include <grpc/support/sync.h> #include <grpc/support/time.h> #include <stdio.h> #ifdef GRPC_TIMERS_RDTSC -typedef long long int grpc_precise_clock; #if defined(__i386__) -static void grpc_precise_clock_now(grpc_precise_clock *clk) { - grpc_precise_clock ret; +static void gpr_get_cycle_counter(long long int *clk) { + long long int ret; __asm__ volatile("rdtsc" : "=A"(ret)); *clk = ret; } // ---------------------------------------------------------------- #elif defined(__x86_64__) || defined(__amd64__) -static void grpc_precise_clock_now(grpc_precise_clock *clk) { +static void gpr_get_cycle_counter(long long int *clk) { unsigned long long low, high; __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); *clk = (high << 32) | low; } #endif + static gpr_once precise_clock_init = GPR_ONCE_INIT; -static double cycles_per_second = 0.0; -static void grpc_precise_clock_init() { +static long long cycles_per_second = 0; +static void gpr_precise_clock_init() { time_t start = time(NULL); - grpc_precise_clock start_time; - grpc_precise_clock end_time; + gpr_precise_clock start_cycle; + gpr_precise_clock end_cycle; while (time(NULL) == start) ; - grpc_precise_clock_now(&start_time); + gpr_get_cycle_counter(&start_cycle); while (time(NULL) == start + 1) ; - grpc_precise_clock_now(&end_time); - cycles_per_second = end_time - start_time; + gpr_get_cycle_counter(&end_cycle); + cycles_per_second = end_cycle - start_cycle; } + static double grpc_precise_clock_scaling_factor() { gpr_once_init(&precise_clock_init, grpc_precise_clock_init); return 1e6 / cycles_per_second; } -#define GRPC_PRECISE_CLOCK_FORMAT "%f" -#define GRPC_PRECISE_CLOCK_PRINTF_ARGS(clk) \ - (*(clk)*grpc_precise_clock_scaling_factor()) -#else -typedef struct grpc_precise_clock grpc_precise_clock; -struct grpc_precise_clock { - gpr_timespec clock; -}; -static void grpc_precise_clock_now(grpc_precise_clock* clk) { - clk->clock = gpr_now(GPR_CLOCK_REALTIME); + +static void gpr_precise_clock_now(gpr_timespec *clk) { + long long int counter; + gpr_get_cycle_counter(&counter); + clk->clock = GPR_CLOCK_REALTIME; + clk->tv_sec = counter / cycles_per_second; + clk->tv_nsec = counter % cycles_per_second; } -#define GRPC_PRECISE_CLOCK_FORMAT "%ld.%09d" -#define GRPC_PRECISE_CLOCK_PRINTF_ARGS(clk) \ - (clk)->clock.tv_sec, (clk)->clock.tv_nsec -static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) { - fprintf(fp, "%ld.%09d", clk->clock.tv_sec, clk->clock.tv_nsec); + +#else /* GRPC_TIMERS_RDTSC */ +static void gpr_precise_clock_now(gpr_timespec *clk) { + *clk = gpr_now(GPR_CLOCK_REALTIME); + clk->clock_type = GPR_CLOCK_PRECISE; } #endif /* GRPC_TIMERS_RDTSC */ -#endif /* GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H */ +#endif /* GRPC_CORE_SUPPORT_TIME_PRECISE_ */ diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index 7f64c80e27..f794855429 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -38,6 +38,7 @@ #ifdef GPR_WIN32 #include <grpc/support/time.h> +#include <src/core/support/time_precise.h> #include <sys/timeb.h> static LARGE_INTEGER g_start_time; @@ -68,6 +69,9 @@ gpr_timespec gpr_now(gpr_clock_type clock) { now_tv.tv_sec = (time_t)now_dbl; now_tv.tv_nsec = (int)((now_dbl - (double)now_tv.tv_sec) * 1e9); break; + case GPR_CLOCK_PRECISE: + gpr_precise_clock_now(&now_tv); + break; } return now_tv; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 33f277da46..4426bbbce9 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1046,10 +1046,11 @@ static int prepare_application_metadata(grpc_call *call, size_t count, (const gpr_uint8 *)md->value, md->value_length, 1); if (!grpc_mdstr_is_legal_header(l->md->key)) { - gpr_log(GPR_ERROR, "attempt to send invalid metadata key"); + gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", + grpc_mdstr_as_c_string(l->md->key)); return 0; } else if (!grpc_mdstr_is_bin_suffixed(l->md->key) && - !grpc_mdstr_is_legal_header(l->md->value)) { + !grpc_mdstr_is_legal_nonbin_header(l->md->value)) { gpr_log(GPR_ERROR, "attempt to send invalid metadata value"); return 0; } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index e50251566d..586402e21c 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -40,6 +40,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" @@ -70,6 +71,7 @@ struct grpc_channel { grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; + grpc_mdelem *default_authority; /** mdelem for grpc-status: 0 thru grpc-status: 2 */ grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS]; @@ -134,10 +136,47 @@ grpc_channel *grpc_channel_create_from_filters( } else { channel->max_message_length = args->args[i].value.integer; } + } else if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s: must be an string", + GRPC_ARG_DEFAULT_AUTHORITY); + } else { + if (channel->default_authority) { + /* setting this takes precedence over anything else */ + GRPC_MDELEM_UNREF(channel->default_authority); + } + channel->default_authority = grpc_mdelem_from_strings( + mdctx, ":authority", args->args[i].value.string); + } + } else if (0 == + strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s: must be an string", + GRPC_SSL_TARGET_NAME_OVERRIDE_ARG); + } else { + if (channel->default_authority) { + /* other ways of setting this (notably ssl) take precedence */ + gpr_log(GPR_ERROR, "%s: default host already set some other way", + GRPC_ARG_DEFAULT_AUTHORITY); + } else { + channel->default_authority = grpc_mdelem_from_strings( + mdctx, ":authority", args->args[i].value.string); + } + } } } } + if (channel->is_client && channel->default_authority == NULL && + target != NULL) { + char *default_authority = grpc_get_default_authority(target); + if (default_authority) { + channel->default_authority = grpc_mdelem_from_strings( + channel->metadata_context, ":authority", default_authority); + } + gpr_free(default_authority); + } + grpc_channel_stack_init(filters, num_filters, channel, args, channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL(channel)); @@ -161,6 +200,8 @@ static grpc_call *grpc_channel_create_call_internal( send_metadata[num_metadata++] = path_mdelem; if (authority_mdelem != NULL) { send_metadata[num_metadata++] = authority_mdelem; + } else if (channel->default_authority != NULL) { + send_metadata[num_metadata++] = GRPC_MDELEM_REF(channel->default_authority); } return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL, @@ -251,6 +292,9 @@ static void destroy_channel(void *p, int ok) { } gpr_free(rc); } + if (channel->default_authority != NULL) { + GRPC_MDELEM_UNREF(channel->default_authority); + } grpc_mdctx_unref(channel->metadata_context); gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel->target); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 82ddfac757..707251da89 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> +#include "src/core/census/grpc_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/compress_filter.h" @@ -165,10 +166,9 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_mdctx *mdctx = grpc_mdctx_create(); int n = 0; GPR_ASSERT(!reserved); - /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; - } */ + } filters[n++] = &grpc_compress_filter; filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index d9044549f2..0d48cd42d7 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -86,11 +86,11 @@ void grpc_init(void) { if (++g_initializations == 1) { gpr_time_init(); grpc_resolver_registry_init("dns:///"); - grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create()); - grpc_register_resolver_type("ipv4", grpc_ipv4_resolver_factory_create()); - grpc_register_resolver_type("ipv6", grpc_ipv6_resolver_factory_create()); + grpc_register_resolver_type(grpc_dns_resolver_factory_create()); + grpc_register_resolver_type(grpc_ipv4_resolver_factory_create()); + grpc_register_resolver_type(grpc_ipv6_resolver_factory_create()); #ifdef GPR_POSIX_SOCKET - grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create()); + grpc_register_resolver_type(grpc_unix_resolver_factory_create()); #endif grpc_register_tracer("channel", &grpc_trace_channel); grpc_register_tracer("surface", &grpc_surface_trace); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 5b03ba95a7..eccee24698 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> +#include "src/core/census/grpc_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/compress_filter.h" @@ -217,10 +218,9 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, args_copy = grpc_channel_args_copy_and_add( new_args_from_connector != NULL ? new_args_from_connector : args, &connector_arg, 1); - /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; - } */ + } filters[n++] = &grpc_compress_filter; filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 1c402418e8..292bf6fab8 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -41,7 +41,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> -#include "src/core/channel/census_filter.h" +#include "src/core/census/grpc_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/iomgr.h" @@ -821,10 +821,9 @@ grpc_server *grpc_server_create_from_filters( server->channel_filters = gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *)); server->channel_filters[0] = &server_surface_filter; - /* TODO(census): restore this once we rework census filter if (census_enabled) { server->channel_filters[1] = &grpc_server_census_filter; - } */ + } for (i = 0; i < filter_count; i++) { server->channel_filters[i + 1 + census_enabled] = filters[i]; } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 0f04169741..1ea697f71e 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -66,6 +66,8 @@ typedef struct { size_t header_idx; /* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */ gpr_uint8 last_was_header; + /* have we seen a regular (non-colon-prefixed) header yet? */ + gpr_uint8 seen_regular_header; /* output stream id */ gpr_uint32 stream_id; gpr_slice_buffer *output; @@ -361,6 +363,15 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, gpr_uint32 indices_key; int should_add_elem; + GPR_ASSERT (GPR_SLICE_LENGTH(elem->key->slice) > 0); + if (GPR_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */ + st->seen_regular_header = 1; + } else if (st->seen_regular_header != 0) { /* reserved header */ + gpr_log(GPR_ERROR, + "Reserved header (colon-prefixed) happening after regular ones."); + abort(); + } + inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems); /* is this elem currently in the decoders table? */ @@ -566,6 +577,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, st.cur_frame_type = NONE; st.last_was_header = 0; + st.seen_regular_header = 0; st.stream_id = stream_id; st.output = output; diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index f92e87e9dd..3fd21a2f5d 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -681,16 +681,34 @@ void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, void grpc_mdctx_unlock(grpc_mdctx *ctx) { unlock(ctx); } -int grpc_mdstr_is_legal_header(grpc_mdstr *s) { - /* TODO(ctiller): consider caching this, or computing it on construction */ +static int conforms_to(grpc_mdstr *s, const gpr_uint8 *legal_bits) { const gpr_uint8 *p = GPR_SLICE_START_PTR(s->slice); const gpr_uint8 *e = GPR_SLICE_END_PTR(s->slice); for (; p != e; p++) { - if (*p < 32 || *p > 126) return 0; + int idx = *p; + int byte = idx / 8; + int bit = idx % 8; + if ((legal_bits[byte] & (1 << bit)) == 0) return 0; } return 1; } +int grpc_mdstr_is_legal_header(grpc_mdstr *s) { + static const gpr_uint8 legal_header_bits[256 / 8] = { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x20, 0xff, 0x03, 0x00, 0x00, 0x00, + 0x80, 0xfe, 0xff, 0xff, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + return conforms_to(s, legal_header_bits); +} + +int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s) { + static const gpr_uint8 legal_header_bits[256 / 8] = { + 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + return conforms_to(s, legal_header_bits); +} + int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s) { /* TODO(ctiller): consider caching this */ return grpc_is_binary_header((const char *)GPR_SLICE_START_PTR(s->slice), diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h index a7af49ba55..eb17747be7 100644 --- a/src/core/transport/metadata.h +++ b/src/core/transport/metadata.h @@ -154,6 +154,7 @@ void grpc_mdelem_unref(grpc_mdelem *md); const char *grpc_mdstr_as_c_string(grpc_mdstr *s); int grpc_mdstr_is_legal_header(grpc_mdstr *s); +int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s); int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s); /* Batch mode metadata functions. |