diff options
Diffstat (limited to 'src/core/ext')
51 files changed, 442 insertions, 513 deletions
diff --git a/src/core/ext/README.md b/src/core/ext/README.md new file mode 100644 index 0000000000..0812b20823 --- /dev/null +++ b/src/core/ext/README.md @@ -0,0 +1,5 @@ +Optional plugins for gRPC Core: Modules in this directory extend gRPC Core in +useful ways. All optional code belongs here. + +NOTE: The movement of code between lib and ext is an ongoing effort, so this +directory currently contains too much of the core library. diff --git a/src/core/ext/census/grpc_plugin.c b/src/core/ext/census/grpc_plugin.c index e43ceafd0c..c9fe453af8 100644 --- a/src/core/ext/census/grpc_plugin.c +++ b/src/core/ext/census/grpc_plugin.c @@ -51,7 +51,8 @@ static bool is_census_enabled(const grpc_channel_args *a) { return census_enabled(); } -static bool maybe_add_census_filter(grpc_channel_stack_builder *builder, +static bool maybe_add_census_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c index 8f0e12296d..3b5d6dab2b 100644 --- a/src/core/ext/census/tracing.c +++ b/src/core/ext/census/tracing.c @@ -31,19 +31,15 @@ * */ -//#include "src/core/ext/census/tracing.h" - #include <grpc/census.h> /* TODO(aveitch): These are all placeholder implementations. */ -// int census_trace_mask(const census_context *context) { -// return CENSUS_TRACE_MASK_NONE; -// } - -// void census_set_trace_mask(int trace_mask) {} +int census_trace_mask(const census_context *context) { + return CENSUS_TRACE_MASK_NONE; +} -// void census_trace_print(census_context *context, uint32_t type, -// const char *buffer, size_t n) {} +void census_set_trace_mask(int trace_mask) {} -// void SetTracerParams(const Params& params); +void census_trace_print(census_context *context, uint32_t type, + const char *buffer, size_t n) {} diff --git a/src/core/ext/client_channel/channel_connectivity.c b/src/core/ext/client_channel/channel_connectivity.c index b10f444b63..dd70bc2c6c 100644 --- a/src/core/ext/client_channel/channel_connectivity.c +++ b/src/core/ext/client_channel/channel_connectivity.c @@ -76,6 +76,7 @@ typedef struct { gpr_mu mu; callback_phase phase; grpc_closure on_complete; + grpc_closure on_timeout; grpc_timer alarm; grpc_connectivity_state state; grpc_completion_queue *cq; @@ -200,6 +201,8 @@ void grpc_channel_watch_connectivity_state( gpr_mu_init(&w->mu); grpc_closure_init(&w->on_complete, watch_complete, w, grpc_schedule_on_exec_ctx); + grpc_closure_init(&w->on_timeout, timeout_complete, w, + grpc_schedule_on_exec_ctx); w->phase = WAITING; w->state = last_observed_state; w->cq = cq; @@ -208,7 +211,7 @@ void grpc_channel_watch_connectivity_state( grpc_timer_init(&exec_ctx, &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); + &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index a762b8917e..2f25fef9a7 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -83,8 +83,12 @@ static void *method_parameters_copy(void *value) { return new_value; } +static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) { + gpr_free(p); +} + static const grpc_mdstr_hash_table_vtable method_parameters_vtable = { - gpr_free, method_parameters_copy}; + method_parameters_free, method_parameters_copy}; static void *method_parameters_create_from_json(const grpc_json *json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; @@ -328,7 +332,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_service_config_create(service_config_json); if (service_config != NULL) { method_params_table = grpc_service_config_create_method_config_table( - service_config, method_parameters_create_from_json, + exec_ctx, service_config, method_parameters_create_from_json, &method_parameters_vtable); grpc_service_config_destroy(service_config); } @@ -337,7 +341,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, // be pointing to data inside chand->resolver_result. // The copy will be saved in chand->lb_policy_name below. lb_policy_name = gpr_strdup(lb_policy_name); - grpc_channel_args_destroy(chand->resolver_result); + grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } @@ -358,7 +362,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->service_config_json = service_config_json; } if (chand->method_params_table != NULL) { - grpc_mdstr_hash_table_unref(chand->method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table); } chand->method_params_table = method_params_table; if (lb_policy != NULL) { @@ -554,7 +558,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, gpr_free(chand->lb_policy_name); gpr_free(chand->service_config_json); if (chand->method_params_table != NULL) { - grpc_mdstr_hash_table_unref(chand->method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); @@ -1001,8 +1005,8 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&chand->mu); // If the method config table was present, use it. if (method_params_table != NULL) { - const method_parameters *method_params = - grpc_method_config_table_get(method_params_table, calld->path); + const method_parameters *method_params = grpc_method_config_table_get( + exec_ctx, method_params_table, calld->path); if (method_params != NULL) { const bool have_method_timeout = gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; @@ -1025,7 +1029,7 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&calld->mu); } } - grpc_mdstr_hash_table_unref(method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, method_params_table); } } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); @@ -1066,8 +1070,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_mdstr_hash_table *method_params_table = grpc_mdstr_hash_table_ref(chand->method_params_table); gpr_mu_unlock(&chand->mu); - method_parameters *method_params = - grpc_method_config_table_get(method_params_table, args->path); + method_parameters *method_params = grpc_method_config_table_get( + exec_ctx, method_params_table, args->path); if (method_params != NULL) { if (gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { @@ -1080,7 +1084,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, method_params->wait_for_ready; } } - grpc_mdstr_hash_table_unref(method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, method_params_table); } else { gpr_mu_unlock(&chand->mu); } @@ -1109,7 +1113,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, void *and_free_memory) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); - GRPC_MDSTR_UNREF(calld->path); + GRPC_MDSTR_UNREF(exec_ctx, calld->path); GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { diff --git a/src/core/ext/client_channel/client_channel_factory.c b/src/core/ext/client_channel/client_channel_factory.c index 4eb35dfcf7..d2707a1556 100644 --- a/src/core/ext/client_channel/client_channel_factory.c +++ b/src/core/ext/client_channel/client_channel_factory.c @@ -61,12 +61,10 @@ static void* factory_arg_copy(void* factory) { return factory; } -static void factory_arg_destroy(void* factory) { +static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) { // TODO(roth): Remove local exec_ctx when // https://github.com/grpc/grpc/pull/8705 is merged. - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_client_channel_factory_unref(&exec_ctx, factory); - grpc_exec_ctx_finish(&exec_ctx); + grpc_client_channel_factory_unref(exec_ctx, factory); } static int factory_arg_cmp(void* factory1, void* factory2) { diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index a3e5079843..d50bba60f6 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -38,17 +38,20 @@ #include <grpc/support/alloc.h> #include "src/core/ext/client_channel/client_channel.h" +#include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" -static bool append_filter(grpc_channel_stack_builder *builder, void *arg) { +static bool append_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg) { return grpc_channel_stack_builder_append_filter( builder, (const grpc_channel_filter *)arg, NULL, NULL); } -static bool set_default_host_if_unset(grpc_channel_stack_builder *builder, +static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *unused) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); @@ -66,9 +69,10 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder *builder, arg.key = GRPC_ARG_DEFAULT_AUTHORITY; arg.value.string = default_authority; grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); - grpc_channel_stack_builder_set_channel_arguments(builder, new_args); + grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder, + new_args); gpr_free(default_authority); - grpc_channel_args_destroy(new_args); + grpc_channel_args_destroy(exec_ctx, new_args); } return true; } @@ -81,6 +85,7 @@ void grpc_client_channel_init(void) { set_default_host_if_unset, NULL); grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter, (void *)&grpc_client_channel_filter); + grpc_http_connect_register_handshaker_factory(); } void grpc_client_channel_shutdown(void) { diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index a0fc95e7bb..fba32561ac 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -44,8 +44,10 @@ #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/env.h" typedef struct http_connect_handshaker { @@ -53,6 +55,8 @@ typedef struct http_connect_handshaker { grpc_handshaker base; char* proxy_server; + grpc_http_header* headers; + size_t num_headers; gpr_refcount refcount; gpr_mu mu; @@ -83,11 +87,17 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx, grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy); } if (handshaker->read_buffer_to_destroy != NULL) { - grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy); + grpc_slice_buffer_destroy_internal(exec_ctx, + handshaker->read_buffer_to_destroy); gpr_free(handshaker->read_buffer_to_destroy); } gpr_free(handshaker->proxy_server); - grpc_slice_buffer_destroy(&handshaker->write_buffer); + for (size_t i = 0; i < handshaker->num_headers; ++i) { + gpr_free(handshaker->headers[i].key); + gpr_free(handshaker->headers[i].value); + } + gpr_free(handshaker->headers); + grpc_slice_buffer_destroy_internal(exec_ctx, &handshaker->write_buffer); grpc_http_parser_destroy(&handshaker->http_parser); grpc_http_response_destroy(&handshaker->http_response); gpr_free(handshaker); @@ -97,12 +107,12 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx, // Set args fields to NULL, saving the endpoint and read buffer for // later destruction. static void cleanup_args_for_failure_locked( - http_connect_handshaker* handshaker) { + grpc_exec_ctx* exec_ctx, http_connect_handshaker* handshaker) { handshaker->endpoint_to_destroy = handshaker->args->endpoint; handshaker->args->endpoint = NULL; handshaker->read_buffer_to_destroy = handshaker->args->read_buffer; handshaker->args->read_buffer = NULL; - grpc_channel_args_destroy(handshaker->args->args); + grpc_channel_args_destroy(exec_ctx, handshaker->args->args); handshaker->args->args = NULL; } @@ -125,7 +135,7 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx, grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); // Not shutting down, so the handshake failed. Clean up before // invoking the callback. - cleanup_args_for_failure_locked(handshaker); + cleanup_args_for_failure_locked(exec_ctx, handshaker); // Set shutdown to true so that subsequent calls to // http_connect_handshaker_shutdown() do nothing. handshaker->shutdown = true; @@ -193,7 +203,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, &handshaker->args->read_buffer->slices[i + 1], handshaker->args->read_buffer->count - i - 1); grpc_slice_buffer_swap(handshaker->args->read_buffer, &tmp_buffer); - grpc_slice_buffer_destroy(&tmp_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &tmp_buffer); break; } } @@ -210,7 +220,8 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, // complete (e.g., handling chunked transfer encoding or looking // at the Content-Length: header). if (handshaker->http_parser.state != GRPC_HTTP_BODY) { - grpc_slice_buffer_reset_and_unref(handshaker->args->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + handshaker->args->read_buffer); grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, handshaker->args->read_buffer, &handshaker->response_read_closure); @@ -255,7 +266,7 @@ static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, if (!handshaker->shutdown) { handshaker->shutdown = true; grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); - cleanup_args_for_failure_locked(handshaker); + cleanup_args_for_failure_locked(exec_ctx, handshaker); } gpr_mu_unlock(&handshaker->mu); } @@ -286,6 +297,8 @@ static void http_connect_handshaker_do_handshake( request.host = server_name; request.http.method = "CONNECT"; request.http.path = server_name; + request.http.hdrs = handshaker->headers; + request.http.hdr_count = handshaker->num_headers; request.handshaker = &grpc_httpcli_plaintext; grpc_slice request_slice = grpc_httpcli_format_connect_request(&request); grpc_slice_buffer_add(&handshaker->write_buffer, request_slice); @@ -303,7 +316,9 @@ static const grpc_handshaker_vtable http_connect_handshaker_vtable = { http_connect_handshaker_destroy, http_connect_handshaker_shutdown, http_connect_handshaker_do_handshake}; -grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) { +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + grpc_http_header* headers, + size_t num_headers) { GPR_ASSERT(proxy_server != NULL); http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker)); memset(handshaker, 0, sizeof(*handshaker)); @@ -311,6 +326,14 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) { gpr_mu_init(&handshaker->mu); gpr_ref_init(&handshaker->refcount, 1); handshaker->proxy_server = gpr_strdup(proxy_server); + if (num_headers > 0) { + handshaker->headers = gpr_malloc(sizeof(grpc_http_header) * num_headers); + for (size_t i = 0; i < num_headers; ++i) { + handshaker->headers[i].key = gpr_strdup(headers[i].key); + handshaker->headers[i].value = gpr_strdup(headers[i].value); + } + handshaker->num_headers = num_headers; + } grpc_slice_buffer_init(&handshaker->write_buffer); grpc_closure_init(&handshaker->request_done_closure, on_write_done, handshaker, grpc_schedule_on_exec_ctx); @@ -344,3 +367,33 @@ done: grpc_uri_destroy(uri); return proxy_name; } + +// +// handshaker factory +// + +static void handshaker_factory_add_handshakers( + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* factory, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { + char* proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, NULL, 0)); + gpr_free(proxy_name); + } +} + +static void handshaker_factory_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker_factory* factory) {} + +static const grpc_handshaker_factory_vtable handshaker_factory_vtable = { + handshaker_factory_add_handshakers, handshaker_factory_destroy}; + +static grpc_handshaker_factory handshaker_factory = { + &handshaker_factory_vtable}; + +void grpc_http_connect_register_handshaker_factory() { + grpc_handshaker_factory_register(true /* at_start */, HANDSHAKER_CLIENT, + &handshaker_factory); +} diff --git a/src/core/ext/client_channel/http_connect_handshaker.h b/src/core/ext/client_channel/http_connect_handshaker.h index ea293852e6..c2e68de716 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.h +++ b/src/core/ext/client_channel/http_connect_handshaker.h @@ -35,12 +35,18 @@ #define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/http/parser.h" -/// Does NOT take ownership of \a proxy_server. -grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server); +/// Creates a new HTTP CONNECT handshaker. +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + grpc_http_header* headers, + size_t num_headers); /// Returns the name of the proxy to use, or NULL if no proxy is configured. /// Caller takes ownership of result. char* grpc_get_http_proxy_server(); +/// Registers handshaker factory. +void grpc_http_connect_register_handshaker_factory(); + #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H */ diff --git a/src/core/ext/client_channel/lb_policy_factory.c b/src/core/ext/client_channel/lb_policy_factory.c index 8a474c8818..7af9bb0411 100644 --- a/src/core/ext/client_channel/lb_policy_factory.c +++ b/src/core/ext/client_channel/lb_policy_factory.c @@ -112,11 +112,13 @@ int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1, return 0; } -void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) { +void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx, + grpc_lb_addresses* addresses) { for (size_t i = 0; i < addresses->num_addresses; ++i) { gpr_free(addresses->addresses[i].balancer_name); if (addresses->addresses[i].user_data != NULL) { - addresses->user_data_vtable->destroy(addresses->addresses[i].user_data); + addresses->user_data_vtable->destroy(exec_ctx, + addresses->addresses[i].user_data); } } gpr_free(addresses->addresses); @@ -126,8 +128,8 @@ void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) { static void* lb_addresses_copy(void* addresses) { return grpc_lb_addresses_copy(addresses); } -static void lb_addresses_destroy(void* addresses) { - grpc_lb_addresses_destroy(addresses); +static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) { + grpc_lb_addresses_destroy(exec_ctx, addresses); } static int lb_addresses_cmp(void* addresses1, void* addresses2) { return grpc_lb_addresses_cmp(addresses1, addresses2); diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h index 79b3dee259..9b8b03f982 100644 --- a/src/core/ext/client_channel/lb_policy_factory.h +++ b/src/core/ext/client_channel/lb_policy_factory.h @@ -64,7 +64,7 @@ typedef struct grpc_lb_address { typedef struct grpc_lb_user_data_vtable { void *(*copy)(void *); - void (*destroy)(void *); + void (*destroy)(grpc_exec_ctx *exec_ctx, void *); int (*cmp)(void *, void *); } grpc_lb_user_data_vtable; @@ -96,7 +96,8 @@ int grpc_lb_addresses_cmp(const grpc_lb_addresses *addresses1, const grpc_lb_addresses *addresses2); /** Destroys \a addresses. */ -void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses); +void grpc_lb_addresses_destroy(grpc_exec_ctx *exec_ctx, + grpc_lb_addresses *addresses); /** Returns a channel arg containing \a addresses. */ grpc_arg grpc_lb_addresses_create_channel_arg( diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 87f0ef298a..1bac82b451 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -46,6 +46,7 @@ #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/backoff.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" @@ -108,6 +109,9 @@ struct grpc_subchannel { /** callback for connection finishing */ grpc_closure connected; + /** callback for our alarm */ + grpc_closure on_alarm; + /** pollset_set tracking who's interested in a connection being setup */ grpc_pollset_set *pollset_set; @@ -206,9 +210,9 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_subchannel *c = arg; gpr_free((void *)c->filters); - grpc_channel_args_destroy(c->args); + grpc_channel_args_destroy(exec_ctx, c->args); gpr_free(c->addr); - grpc_slice_unref(c->initial_connect_string); + grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); grpc_pollset_set_destroy(c->pollset_set); @@ -482,7 +486,8 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", time_til_next.tv_sec, time_til_next.tv_nsec); } - grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + grpc_closure_init(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm, now); } } @@ -603,13 +608,13 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, /* construct channel stack */ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( - builder, c->connecting_result.channel_args); + exec_ctx, builder, c->connecting_result.channel_args); grpc_channel_stack_builder_set_transport(builder, c->connecting_result.transport); if (!grpc_channel_init_create_stack(exec_ctx, builder, GRPC_CLIENT_SUBCHANNEL)) { - grpc_channel_stack_builder_destroy(builder); + grpc_channel_stack_builder_destroy(exec_ctx, builder); abort(); /* TODO(ctiller): what to do here (previously we just crashed) */ } grpc_error *error = grpc_channel_stack_builder_finish( @@ -689,7 +694,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, } gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected"); - grpc_channel_args_destroy(delete_channel_args); + grpc_channel_args_destroy(exec_ctx, delete_channel_args); } /* diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c index a1ba5e945c..1ebe03ef11 100644 --- a/src/core/ext/client_channel/subchannel_index.c +++ b/src/core/ext/client_channel/subchannel_index.c @@ -128,7 +128,7 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *k) { grpc_connector_unref(exec_ctx, k->connector); gpr_free((grpc_channel_args *)k->args.filters); - grpc_channel_args_destroy((grpc_channel_args *)k->args.args); + grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args); gpr_free(k->args.addr); gpr_free(k); } diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c index 0fbc542ef8..f8c946b275 100644 --- a/src/core/ext/client_channel/uri_parser.c +++ b/src/core/ext/client_channel/uri_parser.c @@ -42,7 +42,6 @@ #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> -#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" /** a size_t default value... maps to all 1's */ @@ -138,7 +137,6 @@ static int parse_fragment_or_query(const char *uri_text, size_t *i) { return 1; } -static void do_nothing(void *ignored) {} static void parse_query_parts(grpc_uri *uri) { static const char *QUERY_PARTS_SEPARATOR = "&"; static const char *QUERY_PARTS_VALUE_SEPARATOR = "="; @@ -149,38 +147,32 @@ static void parse_query_parts(grpc_uri *uri) { uri->num_query_parts = 0; return; } - grpc_slice query_slice = - grpc_slice_new(uri->query, strlen(uri->query), do_nothing); - grpc_slice_buffer query_parts; /* the &-separated elements of the query */ - grpc_slice_buffer query_param_parts; /* the =-separated subelements */ - grpc_slice_buffer_init(&query_parts); - grpc_slice_buffer_init(&query_param_parts); - - grpc_slice_split(query_slice, QUERY_PARTS_SEPARATOR, &query_parts); - uri->query_parts = gpr_malloc(query_parts.count * sizeof(char *)); - uri->query_parts_values = gpr_malloc(query_parts.count * sizeof(char *)); - uri->num_query_parts = query_parts.count; - for (size_t i = 0; i < query_parts.count; i++) { - grpc_slice_split(query_parts.slices[i], QUERY_PARTS_VALUE_SEPARATOR, - &query_param_parts); - GPR_ASSERT(query_param_parts.count > 0); - uri->query_parts[i] = - grpc_dump_slice(query_param_parts.slices[0], GPR_DUMP_ASCII); - if (query_param_parts.count > 1) { + gpr_string_split(uri->query, QUERY_PARTS_SEPARATOR, &uri->query_parts, + &uri->num_query_parts); + uri->query_parts_values = gpr_malloc(uri->num_query_parts * sizeof(char **)); + for (size_t i = 0; i < uri->num_query_parts; i++) { + char **query_param_parts; + size_t num_query_param_parts; + char *full = uri->query_parts[i]; + gpr_string_split(full, QUERY_PARTS_VALUE_SEPARATOR, &query_param_parts, + &num_query_param_parts); + GPR_ASSERT(num_query_param_parts > 0); + uri->query_parts[i] = query_param_parts[0]; + if (num_query_param_parts > 1) { /* TODO(dgq): only the first value after the separator is considered. * Perhaps all chars after the first separator for the query part should * be included, even if they include the separator. */ - uri->query_parts_values[i] = - grpc_dump_slice(query_param_parts.slices[1], GPR_DUMP_ASCII); + uri->query_parts_values[i] = query_param_parts[1]; } else { uri->query_parts_values[i] = NULL; } - grpc_slice_buffer_reset_and_unref(&query_param_parts); + for (size_t j = 2; j < num_query_param_parts; j++) { + gpr_free(query_param_parts[j]); + } + gpr_free(query_param_parts); + gpr_free(full); } - grpc_slice_buffer_destroy(&query_parts); - grpc_slice_buffer_destroy(&query_param_parts); - grpc_slice_unref(query_slice); } grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index bb0ee9d95c..97f98df03a 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -117,6 +117,7 @@ #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/backoff.h" #include "src/core/lib/support/string.h" @@ -326,6 +327,9 @@ typedef struct glb_lb_policy { /* A response from the LB server has been received. Process it */ grpc_closure lb_on_response_received; + /* LB call retry timer callback. */ + grpc_closure lb_on_call_retry; + grpc_call *lb_call; /* streaming call to the LB server, */ grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ @@ -386,8 +390,8 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, static void *lb_token_copy(void *token) { return token == NULL ? NULL : GRPC_MDELEM_REF(token); } -static void lb_token_destroy(void *token) { - if (token != NULL) GRPC_MDELEM_UNREF(token); +static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) { + if (token != NULL) GRPC_MDELEM_UNREF(exec_ctx, token); } static int lb_token_cmp(void *token1, void *token2) { if (token1 > token2) return 1; @@ -421,7 +425,7 @@ static void parse_server(const grpc_grpclb_server *server, /* Returns addresses extracted from \a serverlist. */ static grpc_lb_addresses *process_serverlist_locked( - const grpc_grpclb_serverlist *serverlist) { + grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) { size_t num_valid = 0; /* first pass: count how many are valid in order to allocate the necessary * memory in a single block */ @@ -457,8 +461,8 @@ static grpc_lb_addresses *process_serverlist_locked( strnlen(server->load_balance_token, lb_token_max_length); grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( (uint8_t *)server->load_balance_token, lb_token_length); - user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN, - lb_token_mdstr); + user_data = grpc_mdelem_from_metadata_strings( + exec_ctx, GRPC_MDSTR_LB_TOKEN, lb_token_mdstr); } else { char *uri = grpc_sockaddr_to_uri(&addr); gpr_log(GPR_INFO, @@ -581,7 +585,8 @@ static grpc_lb_policy *create_rr_locked( grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; - grpc_lb_addresses *addresses = process_serverlist_locked(serverlist); + grpc_lb_addresses *addresses = + process_serverlist_locked(exec_ctx, serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. @@ -593,8 +598,8 @@ static grpc_lb_policy *create_rr_locked( grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); GPR_ASSERT(rr != NULL); - grpc_lb_addresses_destroy(addresses); - grpc_channel_args_destroy(args.args); + grpc_lb_addresses_destroy(exec_ctx, addresses); + grpc_channel_args_destroy(exec_ctx, args.args); return rr; } @@ -840,7 +845,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->lb_channel = grpc_client_channel_factory_create_channel( exec_ctx, glb_policy->cc_factory, target_uri_str, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args); - grpc_channel_args_destroy(new_args); + grpc_channel_args_destroy(exec_ctx, new_args); gpr_free(target_uri_str); for (size_t i = 0; i < num_grpclb_addrs; i++) { @@ -866,7 +871,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); - grpc_channel_args_destroy(glb_policy->args); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -1089,7 +1094,8 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void lb_call_init_locked(glb_lb_policy *glb_policy) { +static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); GPR_ASSERT(!glb_policy->shutting_down); @@ -1098,7 +1104,7 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) { * glb_policy->base.interested_parties, which is comprised of the polling * entities from \a client_channel. */ glb_policy->lb_call = grpc_channel_create_pollset_set_call( - glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, + exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name, glb_policy->deadline, NULL); @@ -1111,7 +1117,7 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) { grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); glb_policy->lb_request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref(request_payload_slice); + grpc_slice_unref_internal(exec_ctx, request_payload_slice); grpc_grpclb_request_destroy(request); glb_policy->lb_call_status_details = NULL; @@ -1152,7 +1158,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(glb_policy->lb_channel != NULL); if (glb_policy->shutting_down) return; - lb_call_init_locked(glb_policy); + lb_call_init_locked(exec_ctx, glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", @@ -1237,7 +1243,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_grpclb_response_parse_serverlist(response_slice); if (serverlist != NULL) { GPR_ASSERT(glb_policy->lb_call != NULL); - grpc_slice_unref(response_slice); + grpc_slice_unref_internal(exec_ctx, response_slice); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Serverlist with %lu servers received", (unsigned long)serverlist->num_servers); @@ -1281,7 +1287,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, } else { /* serverlist == NULL */ gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); - grpc_slice_unref(response_slice); + grpc_slice_unref_internal(exec_ctx, response_slice); } if (!glb_policy->shutting_down) { @@ -1361,8 +1367,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); + grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer, + glb_policy, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, - lb_call_on_retry_timer, glb_policy, now); + &glb_policy->lb_on_call_retry, now); } gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c index afecb716fb..e352e0396f 100644 --- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c +++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c @@ -1,35 +1,3 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ /* Automatically generated nanopb constant definitions */ /* Generated by nanopb-0.3.7-dev */ diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h index e36d0966f8..725aa7e386 100644 --- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h +++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h @@ -1,35 +1,3 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ /* Automatically generated nanopb header */ /* Generated by nanopb-0.3.7-dev */ diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 47f20a1904..cb679489c3 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -284,7 +284,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(sd->user_data); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); } gpr_free(sd); } diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/load_reporting/load_reporting.c index df1ea0ec9a..37b06a737f 100644 --- a/src/core/ext/load_reporting/load_reporting.c +++ b/src/core/ext/load_reporting/load_reporting.c @@ -53,7 +53,8 @@ static bool is_load_reporting_enabled(const grpc_channel_args *a) { return false; } -static bool maybe_add_load_reporting_filter(grpc_channel_stack_builder *builder, +static bool maybe_add_load_reporting_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index 1403eb0d33..07ef10e6a8 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -68,7 +68,8 @@ typedef struct { grpc_exec_ctx *exec_ctx; } recv_md_filter_args; -static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *recv_md_filter(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_mdelem *md) { recv_md_filter_args *a = user_data; grpc_call_element *elem = a->elem; call_data *calld = elem->call_data; @@ -92,8 +93,8 @@ static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data, recv_md_filter_args a; a.elem = elem; a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter, - &a); + grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata, + recv_md_filter, &a); if (calld->service_method == NULL) { err = grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header")); @@ -192,7 +193,8 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, */ } -static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *lr_trailing_md_filter(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; @@ -216,7 +218,7 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->on_initial_md_ready; } else if (op->send_trailing_metadata) { - grpc_metadata_batch_filter(op->send_trailing_metadata, + grpc_metadata_batch_filter(exec_ctx, op->send_trailing_metadata, lr_trailing_md_filter, elem); } grpc_call_next_op(exec_ctx, elem, op); diff --git a/src/core/ext/resolver/README.md b/src/core/ext/resolver/README.md new file mode 100644 index 0000000000..b0e234e96a --- /dev/null +++ b/src/core/ext/resolver/README.md @@ -0,0 +1,4 @@ +# Resolver + +Implementations of various name resolution schemes. +See the [naming spec](/doc/naming.md). diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index efbb2be4e1..655d9dc586 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -81,6 +81,7 @@ typedef struct { /** retry timer */ bool have_retry_timer; grpc_timer retry_timer; + grpc_closure on_retry; /** retry backoff state */ gpr_backoff backoff_state; @@ -182,13 +183,13 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses); result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1); grpc_resolved_addresses_destroy(r->addresses); - grpc_lb_addresses_destroy(addresses); + grpc_lb_addresses_destroy(exec_ctx, addresses); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); gpr_timespec timeout = gpr_time_sub(next_try, now); const char *msg = grpc_error_string(error); - gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); + gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", msg); grpc_error_free_string(msg); GPR_ASSERT(!r->have_retry_timer); r->have_retry_timer = true; @@ -199,11 +200,12 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r, - now); + grpc_closure_init(&r->on_retry, dns_on_retry_timer, r, + grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } r->resolved_result = result; r->resolved_version++; @@ -242,12 +244,12 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { dns_resolver *r = (dns_resolver *)gr; gpr_mu_destroy(&r->mu); if (r->resolved_result != NULL) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } grpc_pollset_set_destroy(r->interested_parties); gpr_free(r->name_to_resolve); gpr_free(r->default_port); - grpc_channel_args_destroy(r->channel_args); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 55ea502c9e..c146a627cb 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -47,6 +47,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" @@ -131,8 +132,8 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); - grpc_lb_addresses_destroy(r->addresses); - grpc_channel_args_destroy(r->channel_args); + grpc_lb_addresses_destroy(exec_ctx, r->addresses); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } @@ -161,7 +162,8 @@ char *unix_get_default_authority(grpc_resolver_factory *factory, static void do_nothing(void *ignored) {} -static grpc_resolver *sockaddr_create(grpc_resolver_args *args, +static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, + grpc_resolver_args *args, int parse(grpc_uri *uri, grpc_resolved_address *dst)) { if (0 != strcmp(args->uri->authority, "")) { @@ -188,10 +190,10 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args, gpr_free(part_str); if (errors_found) break; } - grpc_slice_buffer_destroy(&path_parts); - grpc_slice_unref(path_slice); + grpc_slice_buffer_destroy_internal(exec_ctx, &path_parts); + grpc_slice_unref_internal(exec_ctx, path_slice); if (errors_found) { - grpc_lb_addresses_destroy(addresses); + grpc_lb_addresses_destroy(exec_ctx, addresses); return NULL; } /* Instantiate resolver. */ @@ -216,7 +218,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *name##_factory_create_resolver( \ grpc_exec_ctx *exec_ctx, grpc_resolver_factory *factory, \ grpc_resolver_args *args) { \ - return sockaddr_create(args, parse_##name); \ + return sockaddr_create(exec_ctx, args, parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ sockaddr_factory_ref, sockaddr_factory_unref, \ diff --git a/src/core/ext/transport/README.md b/src/core/ext/transport/README.md new file mode 100644 index 0000000000..2290568784 --- /dev/null +++ b/src/core/ext/transport/README.md @@ -0,0 +1 @@ +Transports for gRPC diff --git a/src/core/ext/transport/chttp2/README.md b/src/core/ext/transport/chttp2/README.md new file mode 100644 index 0000000000..8880a47460 --- /dev/null +++ b/src/core/ext/transport/chttp2/README.md @@ -0,0 +1 @@ +CHTTP2 - gRPC's implementation of a HTTP2 based transport diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index c807342ebc..2c5dfaea60 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -46,8 +46,9 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/iomgr/tcp_client.h" -#include "src/core/lib/security/transport/security_connector.h" +#include "src/core/lib/slice/slice_internal.h" typedef struct { grpc_connector base; @@ -58,9 +59,6 @@ typedef struct { bool shutdown; bool connecting; - grpc_chttp2_add_handshakers_func add_handshakers; - void *add_handshakers_user_data; - grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; @@ -124,8 +122,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, // point this can be removed. grpc_endpoint_shutdown(exec_ctx, args->endpoint); grpc_endpoint_destroy(exec_ctx, args->endpoint); - grpc_channel_args_destroy(args->args); - grpc_slice_buffer_destroy(args->read_buffer); + grpc_channel_args_destroy(exec_ctx, args->args); + grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer); gpr_free(args->read_buffer); } else { error = GRPC_ERROR_REF(error); @@ -151,16 +149,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, static void start_handshake_locked(grpc_exec_ctx *exec_ctx, chttp2_connector *c) { c->handshake_mgr = grpc_handshake_manager_create(); - char *proxy_name = grpc_get_http_proxy_server(); - if (proxy_name != NULL) { - grpc_handshake_manager_add(c->handshake_mgr, - grpc_http_connect_handshaker_create(proxy_name)); - gpr_free(proxy_name); - } - if (c->add_handshakers != NULL) { - c->add_handshakers(exec_ctx, c->add_handshakers_user_data, + grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args, c->handshake_mgr); - } grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, NULL /* acceptor */, on_handshake_done, c); @@ -250,15 +240,11 @@ static const grpc_connector_vtable chttp2_connector_vtable = { chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown, chttp2_connector_connect}; -grpc_connector *grpc_chttp2_connector_create( - grpc_exec_ctx *exec_ctx, grpc_chttp2_add_handshakers_func add_handshakers, - void *add_handshakers_user_data) { +grpc_connector *grpc_chttp2_connector_create() { chttp2_connector *c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); c->base.vtable = &chttp2_connector_vtable; gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); - c->add_handshakers = add_handshakers; - c->add_handshakers_user_data = add_handshakers_user_data; return &c->base; } diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h index 58eba22417..f5d1025432 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.h +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -35,17 +35,7 @@ #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H #include "src/core/ext/client_channel/connector.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/iomgr/exec_ctx.h" -typedef void (*grpc_chttp2_add_handshakers_func)( - grpc_exec_ctx* exec_ctx, void* user_data, - grpc_handshake_manager* handshake_mgr); - -/// If \a add_handshakers is non-NULL, it will be called with -/// \a add_handshakers_user_data to add handshakers. -grpc_connector* grpc_chttp2_connector_create( - grpc_exec_ctx* exec_ctx, grpc_chttp2_add_handshakers_func add_handshakers, - void* add_handshakers_user_data); +grpc_connector* grpc_chttp2_connector_create(); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index a0d0652ce7..c9f4021216 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -53,8 +53,7 @@ static void client_channel_factory_unref( static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { - grpc_connector *connector = grpc_chttp2_connector_create( - exec_ctx, NULL /* add_handshakers */, NULL /* user_data */); + grpc_connector *connector = grpc_chttp2_connector_create(); grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); grpc_connector_unref(exec_ctx, connector); return s; @@ -72,7 +71,7 @@ static grpc_channel *client_channel_factory_create_channel( grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); - grpc_channel_args_destroy(new_args); + grpc_channel_args_destroy(exec_ctx, new_args); return channel; } @@ -96,17 +95,16 @@ grpc_channel *grpc_insecure_channel_create(const char *target, "grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3, (target, args, reserved)); GPR_ASSERT(reserved == NULL); - grpc_client_channel_factory *factory = - (grpc_client_channel_factory *)&client_channel_factory; // Add channel arg containing the client channel factory. - grpc_arg arg = grpc_client_channel_factory_create_channel_arg(factory); + grpc_arg arg = + grpc_client_channel_factory_create_channel_arg(&client_channel_factory); grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); // Create channel. grpc_channel *channel = client_channel_factory_create_channel( - &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); + &exec_ctx, &client_channel_factory, target, + GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); // Clean up. - grpc_channel_args_destroy(new_args); - grpc_client_channel_factory_unref(&exec_ctx, factory); + grpc_channel_args_destroy(&exec_ctx, new_args); grpc_exec_ctx_finish(&exec_ctx); return channel != NULL ? channel : grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c index 1e5b1c22e3..f1069e2c57 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c @@ -74,7 +74,7 @@ grpc_channel *grpc_insecure_channel_create_from_fd( GPR_ASSERT(transport); grpc_channel *channel = grpc_channel_create( &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); - grpc_channel_args_destroy(final_args); + grpc_channel_args_destroy(&exec_ctx, final_args); grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index f35439cd44..f979d9bad5 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -46,40 +46,16 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -typedef struct { - grpc_client_channel_factory base; - gpr_refcount refs; - grpc_channel_security_connector *security_connector; -} client_channel_factory; - static void client_channel_factory_ref( - grpc_client_channel_factory *cc_factory) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - gpr_ref(&f->refs); -} + grpc_client_channel_factory *cc_factory) {} static void client_channel_factory_unref( - grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - if (gpr_unref(&f->refs)) { - GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, - "client_channel_factory"); - gpr_free(f); - } -} - -static void add_handshakers(grpc_exec_ctx *exec_ctx, void *security_connector, - grpc_handshake_manager *handshake_mgr) { - grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector, - handshake_mgr); -} + grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {} static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - grpc_connector *connector = grpc_chttp2_connector_create( - exec_ctx, add_handshakers, f->security_connector); + grpc_connector *connector = grpc_chttp2_connector_create(); grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); grpc_connector_unref(exec_ctx, connector); return s; @@ -97,7 +73,7 @@ static grpc_channel *client_channel_factory_create_channel( grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); - grpc_channel_args_destroy(new_args); + grpc_channel_args_destroy(exec_ctx, new_args); return channel; } @@ -106,6 +82,9 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable = client_channel_factory_create_subchannel, client_channel_factory_create_channel}; +static grpc_client_channel_factory client_channel_factory = { + &client_channel_factory_vtable}; + /* Create a secure client channel: Asynchronously: - resolve target - connect to it (trying alternatives as presented) @@ -132,39 +111,32 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, grpc_channel_security_connector *security_connector; grpc_channel_args *new_args_from_connector; if (grpc_channel_credentials_create_security_connector( - creds, target, args, &security_connector, &new_args_from_connector) != - GRPC_SECURITY_OK) { + &exec_ctx, creds, target, args, &security_connector, + &new_args_from_connector) != GRPC_SECURITY_OK) { grpc_exec_ctx_finish(&exec_ctx); return grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, "Failed to create security connector."); } - // Create client channel factory. - client_channel_factory *f = gpr_malloc(sizeof(*f)); - memset(f, 0, sizeof(*f)); - f->base.vtable = &client_channel_factory_vtable; - gpr_ref_init(&f->refs, 1); - GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, - "grpc_secure_channel_create"); - f->security_connector = security_connector; // Add channel args containing the client channel factory and security // connector. - grpc_arg new_args[2]; - new_args[0] = grpc_client_channel_factory_create_channel_arg(&f->base); - new_args[1] = grpc_security_connector_to_arg(&security_connector->base); - grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( + grpc_arg args_to_add[2]; + args_to_add[0] = + grpc_client_channel_factory_create_channel_arg(&client_channel_factory); + args_to_add[1] = grpc_security_connector_to_arg(&security_connector->base); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add( new_args_from_connector != NULL ? new_args_from_connector : args, - new_args, GPR_ARRAY_SIZE(new_args)); + args_to_add, GPR_ARRAY_SIZE(args_to_add)); if (new_args_from_connector != NULL) { - grpc_channel_args_destroy(new_args_from_connector); + grpc_channel_args_destroy(&exec_ctx, new_args_from_connector); } // Create channel. grpc_channel *channel = client_channel_factory_create_channel( - &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args_copy); + &exec_ctx, &client_channel_factory, target, + GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); // Clean up. - GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, + GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &security_connector->base, "secure_client_channel_factory_create_channel"); - grpc_channel_args_destroy(args_copy); - grpc_client_channel_factory_unref(&exec_ctx, &f->base); + grpc_channel_args_destroy(&exec_ctx, new_args); grpc_exec_ctx_finish(&exec_ctx); return channel; /* may be NULL */ } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index 9af62d372e..574d1a7710 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -46,31 +46,15 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -void grpc_chttp2_server_handshaker_factory_add_handshakers( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory, - grpc_handshake_manager *handshake_mgr) { - if (handshaker_factory != NULL) { - handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory, - handshake_mgr); - } -} - -void grpc_chttp2_server_handshaker_factory_destroy( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory) { - if (handshaker_factory != NULL) { - handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory); - } -} - typedef struct pending_handshake_manager_node { grpc_handshake_manager *handshake_mgr; struct pending_handshake_manager_node *next; @@ -80,7 +64,6 @@ typedef struct { grpc_server *server; grpc_tcp_server *tcp_server; grpc_channel_args *args; - grpc_chttp2_server_handshaker_factory *handshaker_factory; gpr_mu mu; bool shutdown; grpc_closure tcp_server_shutdown_complete; @@ -148,8 +131,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, // point this can be removed. grpc_endpoint_shutdown(exec_ctx, args->endpoint); grpc_endpoint_destroy(exec_ctx, args->endpoint); - grpc_channel_args_destroy(args->args); - grpc_slice_buffer_destroy(args->read_buffer); + grpc_channel_args_destroy(exec_ctx, args->args); + grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer); gpr_free(args->read_buffer); } } else { @@ -164,7 +147,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, connection_state->accepting_pollset, args->args); grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); - grpc_channel_args_destroy(args->args); + grpc_channel_args_destroy(exec_ctx, args->args); } } pending_handshake_manager_remove_locked(connection_state->server_state, @@ -197,8 +180,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; - grpc_chttp2_server_handshaker_factory_add_handshakers( - exec_ctx, state->handshaker_factory, connection_state->handshake_mgr); + grpc_handshakers_add(exec_ctx, HANDSHAKER_SERVER, state->args, + connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. const gpr_timespec deadline = gpr_time_add( @@ -232,13 +215,11 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. grpc_exec_ctx_flush(exec_ctx); - grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, - state->handshaker_factory); if (destroy_done != NULL) { destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); grpc_exec_ctx_flush(exec_ctx); } - grpc_channel_args_destroy(state->args); + grpc_channel_args_destroy(exec_ctx, state->args); gpr_mu_destroy(&state->mu); gpr_free(state); } @@ -258,10 +239,10 @@ static void server_destroy_listener(grpc_exec_ctx *exec_ctx, grpc_tcp_server_unref(exec_ctx, tcp_server); } -grpc_error *grpc_chttp2_server_add_port( - grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, - grpc_channel_args *args, - grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num) { +grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, + grpc_server *server, const char *addr, + grpc_channel_args *args, + int *port_num) { grpc_resolved_addresses *resolved = NULL; grpc_tcp_server *tcp_server = NULL; size_t i; @@ -292,7 +273,6 @@ grpc_error *grpc_chttp2_server_add_port( state->server = server; state->tcp_server = tcp_server; state->args = args; - state->handshaker_factory = handshaker_factory; state->shutdown = true; gpr_mu_init(&state->mu); @@ -346,8 +326,7 @@ error: if (tcp_server) { grpc_tcp_server_unref(exec_ctx, tcp_server); } else { - grpc_channel_args_destroy(args); - grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, handshaker_factory); + grpc_channel_args_destroy(exec_ctx, args); gpr_free(state); } *port_num = 0; diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h index aa364b565d..2581ebaae9 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.h +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -36,43 +36,12 @@ #include <grpc/impl/codegen/grpc_types.h> -#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/exec_ctx.h" -/// A server handshaker factory is used to create handshakers for server -/// connections. -typedef struct grpc_chttp2_server_handshaker_factory - grpc_chttp2_server_handshaker_factory; - -typedef struct { - void (*add_handshakers)( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory, - grpc_handshake_manager *handshake_mgr); - void (*destroy)(grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory); -} grpc_chttp2_server_handshaker_factory_vtable; - -struct grpc_chttp2_server_handshaker_factory { - const grpc_chttp2_server_handshaker_factory_vtable *vtable; -}; - -void grpc_chttp2_server_handshaker_factory_add_handshakers( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory, - grpc_handshake_manager *handshake_mgr); - -void grpc_chttp2_server_handshaker_factory_destroy( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory); - /// Adds a port to \a server. Sets \a port_num to the port number. -/// If \a handshaker_factory is not NULL, it will be used to create -/// handshakers for the port. -/// Takes ownership of \a args and \a handshaker_factory. -grpc_error *grpc_chttp2_server_add_port( - grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, - grpc_channel_args *args, - grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num); +/// Takes ownership of \a args. +grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, + grpc_server *server, const char *addr, + grpc_channel_args *args, int *port_num); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 7e286d4e46..bf5026bea6 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -47,8 +47,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { (server, addr)); grpc_error *err = grpc_chttp2_server_add_port( &exec_ctx, server, addr, - grpc_channel_args_copy(grpc_server_get_channel_args(server)), - NULL /* handshaker_factory */, &port_num); + grpc_channel_args_copy(grpc_server_get_channel_args(server)), &port_num); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "%s", msg); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c index aa2ecf5743..f46e849932 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -62,7 +62,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, grpc_endpoint *server_endpoint = grpc_tcp_create(grpc_fd_create(fd, name), resource_quota, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); - grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); + grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); gpr_free(name); diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index a33a7a3f7d..395c79a71d 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -49,34 +49,6 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct { - grpc_chttp2_server_handshaker_factory base; - grpc_server_security_connector *security_connector; -} server_security_handshaker_factory; - -static void server_security_handshaker_factory_add_handshakers( - grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf, - grpc_handshake_manager *handshake_mgr) { - server_security_handshaker_factory *handshaker_factory = - (server_security_handshaker_factory *)hf; - grpc_server_security_connector_add_handshakers( - exec_ctx, handshaker_factory->security_connector, handshake_mgr); -} - -static void server_security_handshaker_factory_destroy( - grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf) { - server_security_handshaker_factory *handshaker_factory = - (server_security_handshaker_factory *)hf; - GRPC_SECURITY_CONNECTOR_UNREF(&handshaker_factory->security_connector->base, - "server"); - gpr_free(hf); -} - -static const grpc_chttp2_server_handshaker_factory_vtable - server_security_handshaker_factory_vtable = { - server_security_handshaker_factory_add_handshakers, - server_security_handshaker_factory_destroy}; - int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -94,7 +66,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, goto done; } grpc_security_status status = - grpc_server_credentials_create_security_connector(creds, &sc); + grpc_server_credentials_create_security_connector(&exec_ctx, creds, &sc); if (status != GRPC_SECURITY_OK) { char *msg; gpr_asprintf(&msg, @@ -105,20 +77,19 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, gpr_free(msg); goto done; } - // Create handshaker factory. - server_security_handshaker_factory *handshaker_factory = - gpr_malloc(sizeof(*handshaker_factory)); - memset(handshaker_factory, 0, sizeof(*handshaker_factory)); - handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable; - handshaker_factory->security_connector = sc; // Create channel args. - grpc_arg channel_arg = grpc_server_credentials_to_arg(creds); - grpc_channel_args *args = grpc_channel_args_copy_and_add( - grpc_server_get_channel_args(server), &channel_arg, 1); + grpc_arg args_to_add[2]; + args_to_add[0] = grpc_server_credentials_to_arg(creds); + args_to_add[1] = grpc_security_connector_to_arg(&sc->base); + grpc_channel_args *args = + grpc_channel_args_copy_and_add(grpc_server_get_channel_args(server), + args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Add server port. - err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, - &handshaker_factory->base, &port_num); + err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, &port_num); done: + if (sc != NULL) { + GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &sc->base, "server"); + } grpc_exec_ctx_finish(&exec_ctx); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c index 3eef80b557..8db36e4a7f 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c @@ -34,6 +34,7 @@ #include "src/core/ext/transport/chttp2/transport/bin_decoder.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" @@ -143,7 +144,8 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx) { return true; } -grpc_slice grpc_chttp2_base64_decode(grpc_slice input) { +grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx, + grpc_slice input) { size_t input_length = GRPC_SLICE_LENGTH(input); size_t output_length = input_length / 4 * 3; struct grpc_base64_decode_context ctx; @@ -179,7 +181,7 @@ grpc_slice grpc_chttp2_base64_decode(grpc_slice input) { char *s = grpc_dump_slice(input, GPR_DUMP_ASCII); gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s); gpr_free(s); - grpc_slice_unref(output); + grpc_slice_unref_internal(exec_ctx, output); return gpr_empty_slice(); } GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output)); @@ -187,7 +189,8 @@ grpc_slice grpc_chttp2_base64_decode(grpc_slice input) { return output; } -grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, +grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx, + grpc_slice input, size_t output_length) { size_t input_length = GRPC_SLICE_LENGTH(input); grpc_slice output = grpc_slice_malloc(output_length); @@ -200,7 +203,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, "grpc_chttp2_base64_decode_with_length has a length of %d, which " "has a tail of 1 byte.\n", (int)input_length); - grpc_slice_unref(output); + grpc_slice_unref_internal(exec_ctx, output); return gpr_empty_slice(); } @@ -210,7 +213,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, "than the max possible output length %d.\n", (int)output_length, (int)(input_length / 4 * 3 + tail_xtra[input_length % 4])); - grpc_slice_unref(output); + grpc_slice_unref_internal(exec_ctx, output); return gpr_empty_slice(); } @@ -224,7 +227,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, char *s = grpc_dump_slice(input, GPR_DUMP_ASCII); gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s); gpr_free(s); - grpc_slice_unref(output); + grpc_slice_unref_internal(exec_ctx, output); return gpr_empty_slice(); } GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output)); diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h index 83a90be519..48ffb2ae3b 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h @@ -55,12 +55,13 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx); /* base64 decode a slice with pad chars. Returns a new slice, does not take ownership of the input. Returns an empty slice if decoding is failed. */ -grpc_slice grpc_chttp2_base64_decode(grpc_slice input); +grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx, grpc_slice input); /* base64 decode a slice without pad chars, data length is needed. Returns a new slice, does not take ownership of the input. Returns an empty slice if decoding is failed. */ -grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, +grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx, + grpc_slice input, size_t output_length); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */ diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.h b/src/core/ext/transport/chttp2/transport/bin_encoder.h index 9e143b46e2..477559d0e2 100644 --- a/src/core/ext/transport/chttp2/transport/bin_encoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.h @@ -47,7 +47,7 @@ grpc_slice grpc_chttp2_huffman_compress(grpc_slice input); /* equivalent to: grpc_slice x = grpc_chttp2_base64_encode(input); grpc_slice y = grpc_chttp2_huffman_compress(x); - grpc_slice_unref(x); + grpc_slice_unref_internal(exec_ctx, x); return y; */ grpc_slice grpc_chttp2_base64_encode_and_huffman_compress_impl( grpc_slice input); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 558d588b40..f297dd91a3 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -47,10 +47,12 @@ #include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/status_conversion.h" +#include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -146,13 +148,13 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_endpoint_destroy(exec_ctx, t->ep); - grpc_slice_buffer_destroy(&t->qbuf); + grpc_slice_buffer_destroy_internal(exec_ctx, &t->qbuf); - grpc_slice_buffer_destroy(&t->outbuf); - grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); + grpc_slice_buffer_destroy_internal(exec_ctx, &t->outbuf); + grpc_chttp2_hpack_compressor_destroy(exec_ctx, &t->hpack_compressor); - grpc_slice_buffer_destroy(&t->read_buffer); - grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); + grpc_slice_buffer_destroy_internal(exec_ctx, &t->read_buffer); + grpc_chttp2_hpack_parser_destroy(exec_ctx, &t->hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); for (i = 0; i < STREAM_LIST_COUNT; i++) { @@ -266,7 +268,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, .integral_range = 10}); grpc_chttp2_goaway_parser_init(&t->goaway_parser); - grpc_chttp2_hpack_parser_init(&t->hpack_parser); + grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); grpc_slice_buffer_init(&t->read_buffer); @@ -544,9 +546,11 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_ASSERT(s->recv_message_ready == NULL); GPR_ASSERT(s->recv_trailing_metadata_finished == NULL); grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]); - grpc_slice_buffer_destroy(&s->flow_controlled_buffer); + grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx, + &s->metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx, + &s->metadata_buffer[1]); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer); GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); @@ -781,7 +785,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, char *msg = grpc_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII); GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg)); - grpc_slice_unref(goaway_text); + grpc_slice_unref_internal(exec_ctx, goaway_text); t->seen_goaway = 1; /* lie: use transient failure from the transport to indicate goaway has been * received */ @@ -1295,7 +1299,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, if (op->send_goaway) { send_goaway(exec_ctx, t, grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), - grpc_slice_ref(*op->goaway_message)); + grpc_slice_ref_internal(*op->goaway_message)); } if (op->set_accept_stream) { @@ -1527,21 +1531,22 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); grpc_chttp2_incoming_metadata_buffer_add( - &s->metadata_buffer[1], - grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_STATUS, grpc_mdstr_from_string(status_string))); + &s->metadata_buffer[1], grpc_mdelem_from_metadata_strings( + exec_ctx, GRPC_MDSTR_GRPC_STATUS, + grpc_mdstr_from_string(status_string))); if (slice) { grpc_chttp2_incoming_metadata_buffer_add( &s->metadata_buffer[1], grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_MESSAGE, - grpc_mdstr_from_slice(grpc_slice_ref(*slice)))); + exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_mdstr_from_slice(exec_ctx, + grpc_slice_ref_internal(*slice)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (slice) { - grpc_slice_unref(*slice); + grpc_slice_unref_internal(exec_ctx, *slice); } } @@ -1697,8 +1702,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (optional_message != NULL) { size_t msg_len = strlen(optional_message); - GPR_ASSERT(msg_len < 127); - message_pfx = grpc_slice_malloc(15); + GPR_ASSERT(msg_len <= UINT32_MAX); + uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 0); + message_pfx = grpc_slice_malloc(14 + msg_len_len); p = GRPC_SLICE_START_PTR(message_pfx); *p++ = 0x40; *p++ = 12; /* len(grpc-message) */ @@ -1714,7 +1720,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = 'a'; *p++ = 'g'; *p++ = 'e'; - *p++ = (uint8_t)msg_len; + GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 0, 0, p, + (uint32_t)msg_len_len); + p += msg_len_len; GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx)); len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx); len += (uint32_t)msg_len; @@ -1906,7 +1914,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } - grpc_slice_buffer_reset_and_unref(&t->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer); if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, @@ -2013,7 +2021,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { GRPC_ERROR_UNREF(bs->error); - grpc_slice_buffer_destroy(&bs->slices); + grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices); gpr_mu_destroy(&bs->slice_mu); gpr_free(bs); } diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c index b4c5ed769b..20043f5fbf 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c @@ -109,7 +109,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx, (((uint32_t)p->reason_bytes[2]) << 8) | (((uint32_t)p->reason_bytes[3])); grpc_error *error = GRPC_ERROR_NONE; - if (reason != GRPC_CHTTP2_NO_ERROR) { + if (reason != GRPC_CHTTP2_NO_ERROR || s->header_frames_received < 2) { error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"), GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)reason); grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status( diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index eb68fe3138..49a8326f62 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -48,6 +48,7 @@ #include "src/core/ext/transport/chttp2/transport/bin_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_table.h" #include "src/core/ext/transport/chttp2/transport/varint.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/timeout_encoding.h" @@ -183,7 +184,8 @@ static void evict_entry(grpc_chttp2_hpack_compressor *c) { } /* add an element to the decoder table */ -static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { +static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, + grpc_mdelem *elem) { uint32_t key_hash = elem->key->hash; uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); uint32_t new_index = c->tail_remote_index + c->table_elems + 1; @@ -227,12 +229,12 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] < c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) { /* not there: replace oldest */ - GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]); + GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_2(elem_hash)]); c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem); c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; } else { /* not there: replace oldest */ - GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]); + GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_3(elem_hash)]); c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem); c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; } @@ -251,11 +253,11 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; } else if (c->indices_keys[HASH_FRAGMENT_2(key_hash)] < c->indices_keys[HASH_FRAGMENT_3(key_hash)]) { - GRPC_MDSTR_UNREF(c->entries_keys[HASH_FRAGMENT_2(key_hash)]); + GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[HASH_FRAGMENT_2(key_hash)]); c->entries_keys[HASH_FRAGMENT_2(key_hash)] = GRPC_MDSTR_REF(elem->key); c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index; } else { - GRPC_MDSTR_UNREF(c->entries_keys[HASH_FRAGMENT_3(key_hash)]); + GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[HASH_FRAGMENT_3(key_hash)]); c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key); c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; } @@ -294,7 +296,7 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, add_tiny_header_data(st, len_pfx), len_pfx); GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, grpc_slice_ref(value_slice)); + add_header_data(st, grpc_slice_ref_internal(value_slice)); } static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, @@ -311,7 +313,7 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, add_tiny_header_data(st, len_pfx), len_pfx); GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, grpc_slice_ref(value_slice)); + add_header_data(st, grpc_slice_ref_internal(value_slice)); } static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, @@ -327,10 +329,10 @@ static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, *add_tiny_header_data(st, 1) = 0x40; GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, add_tiny_header_data(st, len_key_len), len_key_len); - add_header_data(st, grpc_slice_ref(elem->key->slice)); + add_header_data(st, grpc_slice_ref_internal(elem->key->slice)); GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, grpc_slice_ref(value_slice)); + add_header_data(st, grpc_slice_ref_internal(value_slice)); } static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c, @@ -346,10 +348,10 @@ static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c, *add_tiny_header_data(st, 1) = 0x00; GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, add_tiny_header_data(st, len_key_len), len_key_len); - add_header_data(st, grpc_slice_ref(elem->key->slice)); + add_header_data(st, grpc_slice_ref_internal(elem->key->slice)); GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, grpc_slice_ref(value_slice)); + add_header_data(st, grpc_slice_ref_internal(value_slice)); } static void emit_advertise_table_size_change(grpc_chttp2_hpack_compressor *c, @@ -366,8 +368,8 @@ static uint32_t dynidx(grpc_chttp2_hpack_compressor *c, uint32_t elem_index) { } /* encode an mdelem */ -static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, - framer_state *st) { +static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, + grpc_mdelem *elem, framer_state *st) { uint32_t key_hash = elem->key->hash; uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); size_t decoder_space_usage; @@ -417,7 +419,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: key (first cuckoo hash) */ if (should_add_elem) { emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); - add_elem(c, elem); + add_elem(exec_ctx, c, elem); return; } else { emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); @@ -432,7 +434,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: key (first cuckoo hash) */ if (should_add_elem) { emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); - add_elem(c, elem); + add_elem(exec_ctx, c, elem); return; } else { emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); @@ -445,7 +447,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, if (should_add_elem) { emit_lithdr_incidx_v(c, elem, st); - add_elem(c, elem); + add_elem(exec_ctx, c, elem); return; } else { emit_lithdr_noidx_v(c, elem, st); @@ -457,16 +459,17 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, #define STRLEN_LIT(x) (sizeof(x) - 1) #define TIMEOUT_KEY "grpc-timeout" -static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, +static void deadline_enc(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, framer_state *st) { char timeout_str[GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; grpc_mdelem *mdelem; grpc_http2_encode_timeout( gpr_time_sub(deadline, gpr_now(deadline.clock_type)), timeout_str); mdelem = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_TIMEOUT, grpc_mdstr_from_string(timeout_str)); - hpack_enc(c, mdelem, st); - GRPC_MDELEM_UNREF(mdelem); + exec_ctx, GRPC_MDSTR_GRPC_TIMEOUT, grpc_mdstr_from_string(timeout_str)); + hpack_enc(exec_ctx, c, mdelem, st); + GRPC_MDELEM_UNREF(exec_ctx, mdelem); } static uint32_t elems_for_bytes(uint32_t bytes) { return (bytes + 31) / 32; } @@ -483,11 +486,12 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c) { sizeof(*c->table_elem_size) * c->cap_table_elems); } -void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) { +void grpc_chttp2_hpack_compressor_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c) { int i; for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_VALUES; i++) { - if (c->entries_keys[i]) GRPC_MDSTR_UNREF(c->entries_keys[i]); - if (c->entries_elems[i]) GRPC_MDELEM_UNREF(c->entries_elems[i]); + if (c->entries_keys[i]) GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[i]); + if (c->entries_elems[i]) GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[i]); } gpr_free(c->table_elem_size); } @@ -542,7 +546,8 @@ void grpc_chttp2_hpack_compressor_set_max_table_size( } } -void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, +void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, uint32_t stream_id, grpc_metadata_batch *metadata, int is_eof, size_t max_frame_size, @@ -571,11 +576,11 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, } grpc_metadata_batch_assert_ok(metadata); for (l = metadata->list.head; l; l = l->next) { - hpack_enc(c, l->md, &st); + hpack_enc(exec_ctx, c, l->md, &st); } deadline = metadata->deadline; if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) { - deadline_enc(c, deadline, &st); + deadline_enc(exec_ctx, c, deadline, &st); } finish_frame(&st, 1, is_eof); diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h index bcbd675ca2..3a35496ec8 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h @@ -83,13 +83,15 @@ typedef struct { } grpc_chttp2_hpack_compressor; void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c); -void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c); +void grpc_chttp2_hpack_compressor_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c); void grpc_chttp2_hpack_compressor_set_max_table_size( grpc_chttp2_hpack_compressor *c, uint32_t max_table_size); void grpc_chttp2_hpack_compressor_set_max_usable_size( grpc_chttp2_hpack_compressor *c, uint32_t max_table_size); -void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t id, +void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, uint32_t id, grpc_metadata_batch *metadata, int is_eof, size_t max_frame_size, grpc_transport_one_way_stats *stats, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 64ce07b2f7..8b91cc760b 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -670,11 +670,11 @@ static const uint8_t inverse_base64[256] = { static grpc_error *on_hdr(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_mdelem *md, int add_to_table) { if (add_to_table) { - grpc_error *err = grpc_chttp2_hptbl_add(&p->table, md); + grpc_error *err = grpc_chttp2_hptbl_add(exec_ctx, &p->table, md); if (err != GRPC_ERROR_NONE) return err; } if (p->on_header == NULL) { - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); return GRPC_ERROR_CREATE("on_header callback not set"); } p->on_header(exec_ctx, p->on_header_user_data, md); @@ -815,10 +815,10 @@ static grpc_error *finish_lithdr_incidx(grpc_exec_ctx *exec_ctx, const uint8_t *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); GPR_ASSERT(md != NULL); /* handled in string parsing */ - grpc_error *err = on_hdr( - exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key), - take_string(p, &p->value)), - 1); + grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings( + exec_ctx, GRPC_MDSTR_REF(md->key), + take_string(p, &p->value)), + 1); if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err); return parse_begin(exec_ctx, p, cur, end); } @@ -828,10 +828,10 @@ static grpc_error *finish_lithdr_incidx_v(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { - grpc_error *err = on_hdr( - exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key), - take_string(p, &p->value)), - 1); + grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings( + exec_ctx, take_string(p, &p->key), + take_string(p, &p->value)), + 1); if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err); return parse_begin(exec_ctx, p, cur, end); } @@ -883,10 +883,10 @@ static grpc_error *finish_lithdr_notidx(grpc_exec_ctx *exec_ctx, const uint8_t *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); GPR_ASSERT(md != NULL); /* handled in string parsing */ - grpc_error *err = on_hdr( - exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key), - take_string(p, &p->value)), - 0); + grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings( + exec_ctx, GRPC_MDSTR_REF(md->key), + take_string(p, &p->value)), + 0); if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err); return parse_begin(exec_ctx, p, cur, end); } @@ -896,10 +896,10 @@ static grpc_error *finish_lithdr_notidx_v(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { - grpc_error *err = on_hdr( - exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key), - take_string(p, &p->value)), - 0); + grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings( + exec_ctx, take_string(p, &p->key), + take_string(p, &p->value)), + 0); if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err); return parse_begin(exec_ctx, p, cur, end); } @@ -951,10 +951,10 @@ static grpc_error *finish_lithdr_nvridx(grpc_exec_ctx *exec_ctx, const uint8_t *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); GPR_ASSERT(md != NULL); /* handled in string parsing */ - grpc_error *err = on_hdr( - exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key), - take_string(p, &p->value)), - 0); + grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings( + exec_ctx, GRPC_MDSTR_REF(md->key), + take_string(p, &p->value)), + 0); if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err); return parse_begin(exec_ctx, p, cur, end); } @@ -964,10 +964,10 @@ static grpc_error *finish_lithdr_nvridx_v(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { - grpc_error *err = on_hdr( - exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key), - take_string(p, &p->value)), - 0); + grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings( + exec_ctx, take_string(p, &p->key), + take_string(p, &p->value)), + 0); if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err); return parse_begin(exec_ctx, p, cur, end); } @@ -1020,7 +1020,7 @@ static grpc_error *finish_max_tbl_size(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index); } grpc_error *err = - grpc_chttp2_hptbl_set_current_table_size(&p->table, p->index); + grpc_chttp2_hptbl_set_current_table_size(exec_ctx, &p->table, p->index); if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err); return parse_begin(exec_ctx, p, cur, end); } @@ -1534,7 +1534,8 @@ static grpc_error *parse_value_string_with_literal_key( /* PUBLIC INTERFACE */ -void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) { +void grpc_chttp2_hpack_parser_init(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_parser *p) { p->on_header = NULL; p->on_header_user_data = NULL; p->state = parse_begin; @@ -1546,7 +1547,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) { p->value.length = 0; p->dynamic_table_update_allowed = 2; p->last_error = GRPC_ERROR_NONE; - grpc_chttp2_hptbl_init(&p->table); + grpc_chttp2_hptbl_init(exec_ctx, &p->table); } void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) { @@ -1554,8 +1555,9 @@ void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) { p->state = parse_stream_dep0; } -void grpc_chttp2_hpack_parser_destroy(grpc_chttp2_hpack_parser *p) { - grpc_chttp2_hptbl_destroy(&p->table); +void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_parser *p) { + grpc_chttp2_hptbl_destroy(exec_ctx, &p->table); GRPC_ERROR_UNREF(p->last_error); gpr_free(p->key.str); gpr_free(p->value.str); diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h index a39bf466cd..52ccf1e7a7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.h +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h @@ -99,8 +99,10 @@ struct grpc_chttp2_hpack_parser { grpc_chttp2_hptbl table; }; -void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p); -void grpc_chttp2_hpack_parser_destroy(grpc_chttp2_hpack_parser *p); +void grpc_chttp2_hpack_parser_init(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_parser *p); +void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_parser *p); void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p); diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c index 2dc793d304..26d4036d49 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_table.c +++ b/src/core/ext/transport/chttp2/transport/hpack_table.c @@ -179,7 +179,7 @@ static uint32_t entries_for_bytes(uint32_t bytes) { GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; } -void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl) { +void grpc_chttp2_hptbl_init(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) { size_t i; memset(tbl, 0, sizeof(*tbl)); @@ -190,18 +190,20 @@ void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl) { tbl->ents = gpr_malloc(sizeof(*tbl->ents) * tbl->cap_entries); memset(tbl->ents, 0, sizeof(*tbl->ents) * tbl->cap_entries); for (i = 1; i <= GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) { - tbl->static_ents[i - 1] = - grpc_mdelem_from_strings(static_table[i].key, static_table[i].value); + tbl->static_ents[i - 1] = grpc_mdelem_from_strings( + exec_ctx, static_table[i].key, static_table[i].value); } } -void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl) { +void grpc_chttp2_hptbl_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hptbl *tbl) { size_t i; for (i = 0; i < GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) { - GRPC_MDELEM_UNREF(tbl->static_ents[i]); + GRPC_MDELEM_UNREF(exec_ctx, tbl->static_ents[i]); } for (i = 0; i < tbl->num_ents; i++) { - GRPC_MDELEM_UNREF(tbl->ents[(tbl->first_ent + i) % tbl->cap_entries]); + GRPC_MDELEM_UNREF(exec_ctx, + tbl->ents[(tbl->first_ent + i) % tbl->cap_entries]); } gpr_free(tbl->ents); } @@ -224,7 +226,7 @@ grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl, } /* Evict one element from the table */ -static void evict1(grpc_chttp2_hptbl *tbl) { +static void evict1(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) { grpc_mdelem *first_ent = tbl->ents[tbl->first_ent]; size_t elem_bytes = GRPC_SLICE_LENGTH(first_ent->key->slice) + GRPC_SLICE_LENGTH(first_ent->value->slice) + @@ -233,7 +235,7 @@ static void evict1(grpc_chttp2_hptbl *tbl) { tbl->mem_used -= (uint32_t)elem_bytes; tbl->first_ent = ((tbl->first_ent + 1) % tbl->cap_entries); tbl->num_ents--; - GRPC_MDELEM_UNREF(first_ent); + GRPC_MDELEM_UNREF(exec_ctx, first_ent); } static void rebuild_ents(grpc_chttp2_hptbl *tbl, uint32_t new_cap) { @@ -249,7 +251,8 @@ static void rebuild_ents(grpc_chttp2_hptbl *tbl, uint32_t new_cap) { tbl->first_ent = 0; } -void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl, +void grpc_chttp2_hptbl_set_max_bytes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hptbl *tbl, uint32_t max_bytes) { if (tbl->max_bytes == max_bytes) { return; @@ -258,12 +261,13 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl, gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes); } while (tbl->mem_used > max_bytes) { - evict1(tbl); + evict1(exec_ctx, tbl); } tbl->max_bytes = max_bytes; } -grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl, +grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hptbl *tbl, uint32_t bytes) { if (tbl->current_table_bytes == bytes) { return GRPC_ERROR_NONE; @@ -281,7 +285,7 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl, gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes); } while (tbl->mem_used > bytes) { - evict1(tbl); + evict1(exec_ctx, tbl); } tbl->current_table_bytes = bytes; tbl->max_entries = entries_for_bytes(bytes); @@ -296,7 +300,8 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl, return GRPC_ERROR_NONE; } -grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { +grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { /* determine how many bytes of buffer this entry represents */ size_t elem_bytes = GRPC_SLICE_LENGTH(md->key->slice) + GRPC_SLICE_LENGTH(md->value->slice) + @@ -326,14 +331,14 @@ grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { * empty table. */ while (tbl->num_ents) { - evict1(tbl); + evict1(exec_ctx, tbl); } return GRPC_ERROR_NONE; } /* evict entries to ensure no overflow */ while (elem_bytes > (size_t)tbl->current_table_bytes - tbl->mem_used) { - evict1(tbl); + evict1(exec_ctx, tbl); } /* copy the finalized entry in */ diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.h b/src/core/ext/transport/chttp2/transport/hpack_table.h index 2ca130e64b..144574ef06 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_table.h +++ b/src/core/ext/transport/chttp2/transport/hpack_table.h @@ -84,18 +84,21 @@ typedef struct { } grpc_chttp2_hptbl; /* initialize a hpack table */ -void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl); -void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl); -void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl, +void grpc_chttp2_hptbl_init(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl); +void grpc_chttp2_hptbl_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl); +void grpc_chttp2_hptbl_set_max_bytes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hptbl *tbl, uint32_t max_bytes); -grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl, +grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hptbl *tbl, uint32_t bytes); /* lookup a table entry based on its hpack index */ grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl, uint32_t index); /* add a table entry to the index */ -grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, +grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hptbl *tbl, grpc_mdelem *md) GRPC_MUST_USE_RESULT; /* Find a key/value pair in the table... returns the index in the table of the most similar entry, or 0 if the value was not found */ diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index 3e463a7995..5d1094999c 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -46,11 +46,11 @@ void grpc_chttp2_incoming_metadata_buffer_init( } void grpc_chttp2_incoming_metadata_buffer_destroy( - grpc_chttp2_incoming_metadata_buffer *buffer) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) { size_t i; if (!buffer->published) { for (i = 0; i < buffer->count; i++) { - GRPC_MDELEM_UNREF(buffer->elems[i].md); + GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); } } gpr_free(buffer->elems); diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index df4343b93e..7a0c4da15f 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -49,7 +49,7 @@ typedef struct { void grpc_chttp2_incoming_metadata_buffer_init( grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_destroy( - grpc_chttp2_incoming_metadata_buffer *buffer); + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index b601cf282d..4373391e44 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -336,7 +336,7 @@ static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser, } static void skip_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdelem *md) { - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } static grpc_error *init_skip_frame_parser(grpc_exec_ctx *exec_ctx, @@ -482,7 +482,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_chttp2_incoming_metadata_buffer_set_deadline( &s->metadata_buffer[0], gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout)); - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } else { const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md); const size_t metadata_size_limit = @@ -500,7 +500,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); s->seen_error = true; - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } else { grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); } @@ -543,7 +543,7 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); s->seen_error = true; - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } else { grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); } @@ -717,7 +717,7 @@ static grpc_error *init_settings_frame_parser(grpc_exec_ctx *exec_ctx, memcpy(t->settings[GRPC_ACKED_SETTINGS], t->settings[GRPC_SENT_SETTINGS], GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); grpc_chttp2_hptbl_set_max_bytes( - &t->hpack_parser.table, + exec_ctx, &t->hpack_parser.table, t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); t->sent_local_settings = 0; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 57648a704d..6ef3b147be 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -39,6 +39,7 @@ #include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" static void add_to_write_list(grpc_chttp2_write_cb **list, grpc_chttp2_write_cb *cb) { @@ -172,7 +173,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, /* send initial metadata if it's available */ if (!sent_initial_metadata && s->send_initial_metadata) { grpc_chttp2_encode_header( - &t->hpack_compressor, s->id, s->send_initial_metadata, 0, + exec_ctx, &t->hpack_compressor, s->id, s->send_initial_metadata, 0, t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], &s->stats.outgoing, &t->outbuf); s->send_initial_metadata = NULL; @@ -251,9 +252,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, &s->stats.outgoing, &t->outbuf); } else { grpc_chttp2_encode_header( - &t->hpack_compressor, s->id, s->send_trailing_metadata, true, - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + exec_ctx, &t->hpack_compressor, s->id, s->send_trailing_metadata, + true, t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], &s->stats.outgoing, &t->outbuf); } s->send_trailing_metadata = NULL; @@ -325,7 +326,7 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end"); } - grpc_slice_buffer_reset_and_unref(&t->outbuf); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->outbuf); GRPC_ERROR_UNREF(error); GPR_TIMER_END("grpc_chttp2_end_write", 0); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index df0a769f6c..296c406324 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -427,6 +427,7 @@ static void on_response_headers_received( cronet_bidirectional_stream *stream, const cronet_bidirectional_stream_header_array *headers, const char *negotiated_protocol) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj *s = (stream_obj *)stream->annotation; @@ -438,7 +439,7 @@ static void on_response_headers_received( grpc_chttp2_incoming_metadata_buffer_add( &s->state.rs.initial_metadata, grpc_mdelem_from_metadata_strings( - grpc_mdstr_from_string(headers->headers[i].key), + &exec_ctx, grpc_mdstr_from_string(headers->headers[i].key), grpc_mdstr_from_string(headers->headers[i].value))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; @@ -455,6 +456,7 @@ static void on_response_headers_received( s->state.rs.remaining_bytes); } gpr_mu_unlock(&s->mu); + grpc_exec_ctx_finish(&exec_ctx); execute_from_storage(s); } @@ -520,6 +522,7 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, static void on_response_trailers_received( cronet_bidirectional_stream *stream, const cronet_bidirectional_stream_header_array *trailers) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj *s = (stream_obj *)stream->annotation; @@ -534,7 +537,7 @@ static void on_response_trailers_received( grpc_chttp2_incoming_metadata_buffer_add( &s->state.rs.trailing_metadata, grpc_mdelem_from_metadata_strings( - grpc_mdstr_from_string(trailers->headers[i].key), + &exec_ctx, grpc_mdstr_from_string(trailers->headers[i].key), grpc_mdstr_from_string(trailers->headers[i].value))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && @@ -554,8 +557,10 @@ static void on_response_trailers_received( s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); + grpc_exec_ctx_finish(&exec_ctx); } else { gpr_mu_unlock(&s->mu); + grpc_exec_ctx_finish(&exec_ctx); execute_from_storage(s); } } |