aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/README.md5
-rw-r--r--src/core/ext/census/grpc_filter.c3
-rw-r--r--src/core/ext/census/grpc_plugin.c3
-rw-r--r--src/core/ext/client_channel/channel_connectivity.c8
-rw-r--r--src/core/ext/client_channel/client_channel.c78
-rw-r--r--src/core/ext/client_channel/client_channel_factory.c6
-rw-r--r--src/core/ext/client_channel/client_channel_plugin.c13
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.c57
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.h5
-rw-r--r--src/core/ext/client_channel/lb_policy_factory.c10
-rw-r--r--src/core/ext/client_channel/lb_policy_factory.h5
-rw-r--r--src/core/ext/client_channel/subchannel.c30
-rw-r--r--src/core/ext/client_channel/subchannel_index.c2
-rw-r--r--src/core/ext/client_channel/uri_parser.c44
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c83
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c28
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c24
-rw-r--r--src/core/ext/load_reporting/load_reporting.c3
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c15
-rw-r--r--src/core/ext/resolver/README.md4
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c27
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c20
-rw-r--r--src/core/ext/transport/README.md1
-rw-r--r--src/core/ext/transport/chttp2/README.md1
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c36
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.h12
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c16
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c2
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c68
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c50
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.h39
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c3
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c2
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c51
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_decoder.c15
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_decoder.h5
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_encoder.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c241
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.c57
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c73
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.c35
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.h13
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.c4
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c10
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c11
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c52
50 files changed, 619 insertions, 671 deletions
diff --git a/src/core/ext/README.md b/src/core/ext/README.md
new file mode 100644
index 0000000000..0812b20823
--- /dev/null
+++ b/src/core/ext/README.md
@@ -0,0 +1,5 @@
+Optional plugins for gRPC Core: Modules in this directory extend gRPC Core in
+useful ways. All optional code belongs here.
+
+NOTE: The movement of code between lib and ext is an ongoing effort, so this
+directory currently contains too much of the core library.
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 3e8acc85e1..8e4d4732b8 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -154,7 +154,8 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
memset(d, 0, sizeof(*d));
d->start_ts = args->start_time;
/* TODO(hongyu): call census_tracing_start_op here. */
- grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
+ grpc_closure_init(&d->finish_recv, server_on_done_recv, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/census/grpc_plugin.c b/src/core/ext/census/grpc_plugin.c
index e43ceafd0c..c9fe453af8 100644
--- a/src/core/ext/census/grpc_plugin.c
+++ b/src/core/ext/census/grpc_plugin.c
@@ -51,7 +51,8 @@ static bool is_census_enabled(const grpc_channel_args *a) {
return census_enabled();
}
-static bool maybe_add_census_filter(grpc_channel_stack_builder *builder,
+static bool maybe_add_census_filter(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
void *arg) {
const grpc_channel_args *args =
grpc_channel_stack_builder_get_channel_arguments(builder);
diff --git a/src/core/ext/client_channel/channel_connectivity.c b/src/core/ext/client_channel/channel_connectivity.c
index 9797e66564..dd70bc2c6c 100644
--- a/src/core/ext/client_channel/channel_connectivity.c
+++ b/src/core/ext/client_channel/channel_connectivity.c
@@ -76,6 +76,7 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
grpc_closure on_complete;
+ grpc_closure on_timeout;
grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
@@ -198,7 +199,10 @@ void grpc_channel_watch_connectivity_state(
grpc_cq_begin_op(cq, tag);
gpr_mu_init(&w->mu);
- grpc_closure_init(&w->on_complete, watch_complete, w);
+ grpc_closure_init(&w->on_complete, watch_complete, w,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&w->on_timeout, timeout_complete, w,
+ grpc_schedule_on_exec_ctx);
w->phase = WAITING;
w->state = last_observed_state;
w->cq = cq;
@@ -207,7 +211,7 @@ void grpc_channel_watch_connectivity_state(
grpc_timer_init(&exec_ctx, &w->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
+ &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 9d46338428..2f25fef9a7 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -83,8 +83,12 @@ static void *method_parameters_copy(void *value) {
return new_value;
}
+static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) {
+ gpr_free(p);
+}
+
static const grpc_mdstr_hash_table_vtable method_parameters_vtable = {
- gpr_free, method_parameters_copy};
+ method_parameters_free, method_parameters_copy};
static void *method_parameters_create_from_json(const grpc_json *json) {
wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
@@ -249,7 +253,8 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
w->chand = chand;
- grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w,
+ grpc_schedule_on_exec_ctx);
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
@@ -327,7 +332,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_service_config_create(service_config_json);
if (service_config != NULL) {
method_params_table = grpc_service_config_create_method_config_table(
- service_config, method_parameters_create_from_json,
+ exec_ctx, service_config, method_parameters_create_from_json,
&method_parameters_vtable);
grpc_service_config_destroy(service_config);
}
@@ -336,7 +341,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
// be pointing to data inside chand->resolver_result.
// The copy will be saved in chand->lb_policy_name below.
lb_policy_name = gpr_strdup(lb_policy_name);
- grpc_channel_args_destroy(chand->resolver_result);
+ grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
chand->resolver_result = NULL;
}
@@ -357,18 +362,16 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->service_config_json = service_config_json;
}
if (chand->method_params_table != NULL) {
- grpc_mdstr_hash_table_unref(chand->method_params_table);
+ grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table);
}
chand->method_params_table = method_params_table;
if (lb_policy != NULL) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
- NULL);
+ grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
} else if (chand->resolver == NULL /* disconnected */) {
grpc_closure_list_fail_all(
&chand->waiting_for_config_closures,
GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
- grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
- NULL);
+ grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -425,7 +428,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) {
@@ -444,9 +447,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->send_ping != NULL) {
if (chand->lb_policy == NULL) {
- grpc_exec_ctx_sched(exec_ctx, op->send_ping,
- GRPC_ERROR_CREATE("Ping with no load balancing"),
- NULL);
+ grpc_closure_sched(exec_ctx, op->send_ping,
+ GRPC_ERROR_CREATE("Ping with no load balancing"));
} else {
grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL;
@@ -465,8 +467,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (!chand->started_resolving) {
grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
GRPC_ERROR_REF(op->disconnect_with_error));
- grpc_exec_ctx_enqueue_list(exec_ctx,
- &chand->waiting_for_config_closures, NULL);
+ grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
@@ -511,7 +512,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_init(&chand->mu);
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
- on_resolver_result_changed, chand);
+ on_resolver_result_changed, chand,
+ grpc_schedule_on_exec_ctx);
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
@@ -556,7 +558,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_free(chand->lb_policy_name);
gpr_free(chand->service_config_json);
if (chand->method_params_table != NULL) {
- grpc_mdstr_hash_table_unref(chand->method_params_table);
+ grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table);
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
@@ -678,8 +680,9 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
@@ -761,14 +764,14 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
+ grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
} else {
call_data *calld = cpa->elem->call_data;
gpr_mu_lock(&calld->mu);
if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags, cpa->connected_subchannel,
cpa->on_ready, GRPC_ERROR_NONE)) {
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&calld->mu);
}
@@ -800,9 +803,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, cpa->on_ready,
- GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
}
}
gpr_mu_unlock(&chand->mu);
@@ -853,12 +856,12 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
- grpc_closure_init(&cpa->closure, continue_picking, cpa);
+ grpc_closure_init(&cpa->closure, continue_picking, cpa,
+ grpc_schedule_on_exec_ctx);
grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
GRPC_ERROR_NONE);
} else {
- grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"),
- NULL);
+ grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"));
}
gpr_mu_unlock(&chand->mu);
@@ -943,7 +946,8 @@ retry:
calld->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
- grpc_closure_init(&calld->next_step, subchannel_ready, elem);
+ grpc_closure_init(&calld->next_step, subchannel_ready, elem,
+ grpc_schedule_on_exec_ctx);
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
@@ -1001,8 +1005,8 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&chand->mu);
// If the method config table was present, use it.
if (method_params_table != NULL) {
- const method_parameters *method_params =
- grpc_method_config_table_get(method_params_table, calld->path);
+ const method_parameters *method_params = grpc_method_config_table_get(
+ exec_ctx, method_params_table, calld->path);
if (method_params != NULL) {
const bool have_method_timeout =
gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0;
@@ -1025,7 +1029,7 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&calld->mu);
}
}
- grpc_mdstr_hash_table_unref(method_params_table);
+ grpc_mdstr_hash_table_unref(exec_ctx, method_params_table);
}
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
@@ -1066,8 +1070,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_mdstr_hash_table *method_params_table =
grpc_mdstr_hash_table_ref(chand->method_params_table);
gpr_mu_unlock(&chand->mu);
- method_parameters *method_params =
- grpc_method_config_table_get(method_params_table, args->path);
+ method_parameters *method_params = grpc_method_config_table_get(
+ exec_ctx, method_params_table, args->path);
if (method_params != NULL) {
if (gpr_time_cmp(method_params->timeout,
gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) {
@@ -1080,7 +1084,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
method_params->wait_for_ready;
}
}
- grpc_mdstr_hash_table_unref(method_params_table);
+ grpc_mdstr_hash_table_unref(exec_ctx, method_params_table);
} else {
gpr_mu_unlock(&chand->mu);
}
@@ -1089,7 +1093,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
// get the service config data once the resolver returns.
// Take a reference to the call stack to be owned by the callback.
GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config");
- grpc_closure_init(&calld->read_service_config, read_service_config, elem);
+ grpc_closure_init(&calld->read_service_config, read_service_config, elem,
+ grpc_schedule_on_exec_ctx);
grpc_closure_list_append(&chand->waiting_for_config_closures,
&calld->read_service_config, GRPC_ERROR_NONE);
gpr_mu_unlock(&chand->mu);
@@ -1108,7 +1113,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
void *and_free_memory) {
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
- GRPC_MDSTR_UNREF(calld->path);
+ GRPC_MDSTR_UNREF(exec_ctx, calld->path);
GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
@@ -1202,7 +1207,8 @@ void grpc_client_channel_watch_connectivity_state(
w->pollset = pollset;
w->on_complete = on_complete;
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
- grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
+ grpc_schedule_on_exec_ctx);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
gpr_mu_lock(&chand->mu);
diff --git a/src/core/ext/client_channel/client_channel_factory.c b/src/core/ext/client_channel/client_channel_factory.c
index 4eb35dfcf7..d2707a1556 100644
--- a/src/core/ext/client_channel/client_channel_factory.c
+++ b/src/core/ext/client_channel/client_channel_factory.c
@@ -61,12 +61,10 @@ static void* factory_arg_copy(void* factory) {
return factory;
}
-static void factory_arg_destroy(void* factory) {
+static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) {
// TODO(roth): Remove local exec_ctx when
// https://github.com/grpc/grpc/pull/8705 is merged.
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_client_channel_factory_unref(&exec_ctx, factory);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_client_channel_factory_unref(exec_ctx, factory);
}
static int factory_arg_cmp(void* factory1, void* factory2) {
diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c
index a3e5079843..d50bba60f6 100644
--- a/src/core/ext/client_channel/client_channel_plugin.c
+++ b/src/core/ext/client_channel/client_channel_plugin.c
@@ -38,17 +38,20 @@
#include <grpc/support/alloc.h>
#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/client_channel/http_connect_handshaker.h"
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/subchannel_index.h"
#include "src/core/lib/surface/channel_init.h"
-static bool append_filter(grpc_channel_stack_builder *builder, void *arg) {
+static bool append_filter(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder, void *arg) {
return grpc_channel_stack_builder_append_filter(
builder, (const grpc_channel_filter *)arg, NULL, NULL);
}
-static bool set_default_host_if_unset(grpc_channel_stack_builder *builder,
+static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
void *unused) {
const grpc_channel_args *args =
grpc_channel_stack_builder_get_channel_arguments(builder);
@@ -66,9 +69,10 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder *builder,
arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
arg.value.string = default_authority;
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
- grpc_channel_stack_builder_set_channel_arguments(builder, new_args);
+ grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder,
+ new_args);
gpr_free(default_authority);
- grpc_channel_args_destroy(new_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
}
return true;
}
@@ -81,6 +85,7 @@ void grpc_client_channel_init(void) {
set_default_host_if_unset, NULL);
grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
(void *)&grpc_client_channel_filter);
+ grpc_http_connect_register_handshaker_factory();
}
void grpc_client_channel_shutdown(void) {
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c
index 76c78ee853..ace804c47f 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/client_channel/http_connect_handshaker.c
@@ -44,8 +44,10 @@
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/env.h"
typedef struct http_connect_handshaker {
@@ -83,11 +85,12 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy);
}
if (handshaker->read_buffer_to_destroy != NULL) {
- grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy);
+ grpc_slice_buffer_destroy_internal(exec_ctx,
+ handshaker->read_buffer_to_destroy);
gpr_free(handshaker->read_buffer_to_destroy);
}
gpr_free(handshaker->proxy_server);
- grpc_slice_buffer_destroy(&handshaker->write_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &handshaker->write_buffer);
grpc_http_parser_destroy(&handshaker->http_parser);
grpc_http_response_destroy(&handshaker->http_response);
gpr_free(handshaker);
@@ -97,12 +100,12 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
// Set args fields to NULL, saving the endpoint and read buffer for
// later destruction.
static void cleanup_args_for_failure_locked(
- http_connect_handshaker* handshaker) {
+ grpc_exec_ctx* exec_ctx, http_connect_handshaker* handshaker) {
handshaker->endpoint_to_destroy = handshaker->args->endpoint;
handshaker->args->endpoint = NULL;
handshaker->read_buffer_to_destroy = handshaker->args->read_buffer;
handshaker->args->read_buffer = NULL;
- grpc_channel_args_destroy(handshaker->args->args);
+ grpc_channel_args_destroy(exec_ctx, handshaker->args->args);
handshaker->args->args = NULL;
}
@@ -125,13 +128,13 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
// Not shutting down, so the handshake failed. Clean up before
// invoking the callback.
- cleanup_args_for_failure_locked(handshaker);
+ cleanup_args_for_failure_locked(exec_ctx, handshaker);
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
handshaker->shutdown = true;
}
// Invoke callback.
- grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, handshaker->on_handshake_done, error);
}
// Callback invoked when finished writing HTTP CONNECT request.
@@ -193,7 +196,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
&handshaker->args->read_buffer->slices[i + 1],
handshaker->args->read_buffer->count - i - 1);
grpc_slice_buffer_swap(handshaker->args->read_buffer, &tmp_buffer);
- grpc_slice_buffer_destroy(&tmp_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &tmp_buffer);
break;
}
}
@@ -210,7 +213,8 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
// complete (e.g., handling chunked transfer encoding or looking
// at the Content-Length: header).
if (handshaker->http_parser.state != GRPC_HTTP_BODY) {
- grpc_slice_buffer_reset_and_unref(handshaker->args->read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ handshaker->args->read_buffer);
grpc_endpoint_read(exec_ctx, handshaker->args->endpoint,
handshaker->args->read_buffer,
&handshaker->response_read_closure);
@@ -229,7 +233,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
goto done;
}
// Success. Invoke handshake-done callback.
- grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, handshaker->on_handshake_done, error);
done:
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
@@ -255,7 +259,7 @@ static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
if (!handshaker->shutdown) {
handshaker->shutdown = true;
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
- cleanup_args_for_failure_locked(handshaker);
+ cleanup_args_for_failure_locked(exec_ctx, handshaker);
}
gpr_mu_unlock(&handshaker->mu);
}
@@ -313,9 +317,9 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) {
handshaker->proxy_server = gpr_strdup(proxy_server);
grpc_slice_buffer_init(&handshaker->write_buffer);
grpc_closure_init(&handshaker->request_done_closure, on_write_done,
- handshaker);
+ handshaker, grpc_schedule_on_exec_ctx);
grpc_closure_init(&handshaker->response_read_closure, on_read_done,
- handshaker);
+ handshaker, grpc_schedule_on_exec_ctx);
grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE,
&handshaker->http_response);
return &handshaker->base;
@@ -344,3 +348,32 @@ done:
grpc_uri_destroy(uri);
return proxy_name;
}
+
+//
+// handshaker factory
+//
+
+static void handshaker_factory_add_handshakers(
+ grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* factory,
+ const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) {
+ char* proxy_name = grpc_get_http_proxy_server();
+ if (proxy_name != NULL) {
+ grpc_handshake_manager_add(handshake_mgr,
+ grpc_http_connect_handshaker_create(proxy_name));
+ gpr_free(proxy_name);
+ }
+}
+
+static void handshaker_factory_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker_factory* factory) {}
+
+static const grpc_handshaker_factory_vtable handshaker_factory_vtable = {
+ handshaker_factory_add_handshakers, handshaker_factory_destroy};
+
+static grpc_handshaker_factory handshaker_factory = {
+ &handshaker_factory_vtable};
+
+void grpc_http_connect_register_handshaker_factory() {
+ grpc_handshaker_factory_register(true /* at_start */, HANDSHAKER_CLIENT,
+ &handshaker_factory);
+}
diff --git a/src/core/ext/client_channel/http_connect_handshaker.h b/src/core/ext/client_channel/http_connect_handshaker.h
index ea293852e6..56485f1373 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.h
+++ b/src/core/ext/client_channel/http_connect_handshaker.h
@@ -36,11 +36,14 @@
#include "src/core/lib/channel/handshaker.h"
-/// Does NOT take ownership of \a proxy_server.
+/// Creates a new HTTP CONNECT handshaker.
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server);
/// Returns the name of the proxy to use, or NULL if no proxy is configured.
/// Caller takes ownership of result.
char* grpc_get_http_proxy_server();
+/// Registers handshaker factory.
+void grpc_http_connect_register_handshaker_factory();
+
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H */
diff --git a/src/core/ext/client_channel/lb_policy_factory.c b/src/core/ext/client_channel/lb_policy_factory.c
index 8a474c8818..7af9bb0411 100644
--- a/src/core/ext/client_channel/lb_policy_factory.c
+++ b/src/core/ext/client_channel/lb_policy_factory.c
@@ -112,11 +112,13 @@ int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1,
return 0;
}
-void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) {
+void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_lb_addresses* addresses) {
for (size_t i = 0; i < addresses->num_addresses; ++i) {
gpr_free(addresses->addresses[i].balancer_name);
if (addresses->addresses[i].user_data != NULL) {
- addresses->user_data_vtable->destroy(addresses->addresses[i].user_data);
+ addresses->user_data_vtable->destroy(exec_ctx,
+ addresses->addresses[i].user_data);
}
}
gpr_free(addresses->addresses);
@@ -126,8 +128,8 @@ void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) {
static void* lb_addresses_copy(void* addresses) {
return grpc_lb_addresses_copy(addresses);
}
-static void lb_addresses_destroy(void* addresses) {
- grpc_lb_addresses_destroy(addresses);
+static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) {
+ grpc_lb_addresses_destroy(exec_ctx, addresses);
}
static int lb_addresses_cmp(void* addresses1, void* addresses2) {
return grpc_lb_addresses_cmp(addresses1, addresses2);
diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h
index 79b3dee259..9b8b03f982 100644
--- a/src/core/ext/client_channel/lb_policy_factory.h
+++ b/src/core/ext/client_channel/lb_policy_factory.h
@@ -64,7 +64,7 @@ typedef struct grpc_lb_address {
typedef struct grpc_lb_user_data_vtable {
void *(*copy)(void *);
- void (*destroy)(void *);
+ void (*destroy)(grpc_exec_ctx *exec_ctx, void *);
int (*cmp)(void *, void *);
} grpc_lb_user_data_vtable;
@@ -96,7 +96,8 @@ int grpc_lb_addresses_cmp(const grpc_lb_addresses *addresses1,
const grpc_lb_addresses *addresses2);
/** Destroys \a addresses. */
-void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses);
+void grpc_lb_addresses_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_lb_addresses *addresses);
/** Returns a channel arg containing \a addresses. */
grpc_arg grpc_lb_addresses_create_channel_arg(
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index f294e69392..1bac82b451 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -46,6 +46,7 @@
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/backoff.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
@@ -108,6 +109,9 @@ struct grpc_subchannel {
/** callback for connection finishing */
grpc_closure connected;
+ /** callback for our alarm */
+ grpc_closure on_alarm;
+
/** pollset_set tracking who's interested in a connection
being setup */
grpc_pollset_set *pollset_set;
@@ -206,9 +210,9 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
- grpc_channel_args_destroy(c->args);
+ grpc_channel_args_destroy(exec_ctx, c->args);
gpr_free(c->addr);
- grpc_slice_unref(c->initial_connect_string);
+ grpc_slice_unref_internal(exec_ctx, c->initial_connect_string);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
grpc_pollset_set_destroy(c->pollset_set);
@@ -293,8 +297,9 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
gpr_atm old_refs;
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
- grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
}
@@ -330,7 +335,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
c->args = grpc_channel_args_copy(args->args);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
- grpc_closure_init(&c->connected, subchannel_connected, c);
+ grpc_closure_init(&c->connected, subchannel_connected, c,
+ grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
int initial_backoff_ms =
@@ -480,7 +486,8 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
time_til_next.tv_sec, time_til_next.tv_nsec);
}
- grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ grpc_closure_init(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
+ grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm, now);
}
}
@@ -505,7 +512,8 @@ void grpc_subchannel_notify_on_state_change(
w->subchannel = c;
w->pollset_set = interested_parties;
w->notify = notify;
- grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
+ grpc_closure_init(&w->closure, on_external_state_watcher_done, w,
+ grpc_schedule_on_exec_ctx);
if (interested_parties != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
interested_parties);
@@ -600,13 +608,13 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
/* construct channel stack */
grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(
- builder, c->connecting_result.channel_args);
+ exec_ctx, builder, c->connecting_result.channel_args);
grpc_channel_stack_builder_set_transport(builder,
c->connecting_result.transport);
if (!grpc_channel_init_create_stack(exec_ctx, builder,
GRPC_CLIENT_SUBCHANNEL)) {
- grpc_channel_stack_builder_destroy(builder);
+ grpc_channel_stack_builder_destroy(exec_ctx, builder);
abort(); /* TODO(ctiller): what to do here (previously we just crashed) */
}
grpc_error *error = grpc_channel_stack_builder_finish(
@@ -626,7 +634,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
sw_subchannel->subchannel = c;
sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
- sw_subchannel);
+ sw_subchannel, grpc_schedule_on_exec_ctx);
if (c->disconnected) {
gpr_free(sw_subchannel);
@@ -686,7 +694,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
}
gpr_mu_unlock(&c->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected");
- grpc_channel_args_destroy(delete_channel_args);
+ grpc_channel_args_destroy(exec_ctx, delete_channel_args);
}
/*
diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c
index a1ba5e945c..1ebe03ef11 100644
--- a/src/core/ext/client_channel/subchannel_index.c
+++ b/src/core/ext/client_channel/subchannel_index.c
@@ -128,7 +128,7 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *k) {
grpc_connector_unref(exec_ctx, k->connector);
gpr_free((grpc_channel_args *)k->args.filters);
- grpc_channel_args_destroy((grpc_channel_args *)k->args.args);
+ grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args);
gpr_free(k->args.addr);
gpr_free(k);
}
diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c
index 0fbc542ef8..f8c946b275 100644
--- a/src/core/ext/client_channel/uri_parser.c
+++ b/src/core/ext/client_channel/uri_parser.c
@@ -42,7 +42,6 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
/** a size_t default value... maps to all 1's */
@@ -138,7 +137,6 @@ static int parse_fragment_or_query(const char *uri_text, size_t *i) {
return 1;
}
-static void do_nothing(void *ignored) {}
static void parse_query_parts(grpc_uri *uri) {
static const char *QUERY_PARTS_SEPARATOR = "&";
static const char *QUERY_PARTS_VALUE_SEPARATOR = "=";
@@ -149,38 +147,32 @@ static void parse_query_parts(grpc_uri *uri) {
uri->num_query_parts = 0;
return;
}
- grpc_slice query_slice =
- grpc_slice_new(uri->query, strlen(uri->query), do_nothing);
- grpc_slice_buffer query_parts; /* the &-separated elements of the query */
- grpc_slice_buffer query_param_parts; /* the =-separated subelements */
- grpc_slice_buffer_init(&query_parts);
- grpc_slice_buffer_init(&query_param_parts);
-
- grpc_slice_split(query_slice, QUERY_PARTS_SEPARATOR, &query_parts);
- uri->query_parts = gpr_malloc(query_parts.count * sizeof(char *));
- uri->query_parts_values = gpr_malloc(query_parts.count * sizeof(char *));
- uri->num_query_parts = query_parts.count;
- for (size_t i = 0; i < query_parts.count; i++) {
- grpc_slice_split(query_parts.slices[i], QUERY_PARTS_VALUE_SEPARATOR,
- &query_param_parts);
- GPR_ASSERT(query_param_parts.count > 0);
- uri->query_parts[i] =
- grpc_dump_slice(query_param_parts.slices[0], GPR_DUMP_ASCII);
- if (query_param_parts.count > 1) {
+ gpr_string_split(uri->query, QUERY_PARTS_SEPARATOR, &uri->query_parts,
+ &uri->num_query_parts);
+ uri->query_parts_values = gpr_malloc(uri->num_query_parts * sizeof(char **));
+ for (size_t i = 0; i < uri->num_query_parts; i++) {
+ char **query_param_parts;
+ size_t num_query_param_parts;
+ char *full = uri->query_parts[i];
+ gpr_string_split(full, QUERY_PARTS_VALUE_SEPARATOR, &query_param_parts,
+ &num_query_param_parts);
+ GPR_ASSERT(num_query_param_parts > 0);
+ uri->query_parts[i] = query_param_parts[0];
+ if (num_query_param_parts > 1) {
/* TODO(dgq): only the first value after the separator is considered.
* Perhaps all chars after the first separator for the query part should
* be included, even if they include the separator. */
- uri->query_parts_values[i] =
- grpc_dump_slice(query_param_parts.slices[1], GPR_DUMP_ASCII);
+ uri->query_parts_values[i] = query_param_parts[1];
} else {
uri->query_parts_values[i] = NULL;
}
- grpc_slice_buffer_reset_and_unref(&query_param_parts);
+ for (size_t j = 2; j < num_query_param_parts; j++) {
+ gpr_free(query_param_parts[j]);
+ }
+ gpr_free(query_param_parts);
+ gpr_free(full);
}
- grpc_slice_buffer_destroy(&query_parts);
- grpc_slice_buffer_destroy(&query_param_parts);
- grpc_slice_unref(query_slice);
}
grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index bed5e6c901..97f98df03a 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -117,6 +117,7 @@
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/backoff.h"
#include "src/core/lib/support/string.h"
@@ -180,8 +181,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
wrapped_rr_closure_arg *wc_arg = arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
if (wc_arg->rr_policy != NULL) {
/* if *target is NULL, no pick has been made by the RR policy (eg, all
@@ -248,7 +248,8 @@ static void add_pending_pick(pending_pick **root,
pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.free_when_done = pp;
grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
- wrapped_rr_closure, &pp->wrapped_on_complete_arg);
+ wrapped_rr_closure, &pp->wrapped_on_complete_arg,
+ grpc_schedule_on_exec_ctx);
*root = pp;
}
@@ -268,7 +269,8 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
pping->wrapped_notify_arg.free_when_done = pping;
pping->next = *root;
grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
- wrapped_rr_closure, &pping->wrapped_notify_arg);
+ wrapped_rr_closure, &pping->wrapped_notify_arg,
+ grpc_schedule_on_exec_ctx);
*root = pping;
}
@@ -325,6 +327,9 @@ typedef struct glb_lb_policy {
/* A response from the LB server has been received. Process it */
grpc_closure lb_on_response_received;
+ /* LB call retry timer callback. */
+ grpc_closure lb_on_call_retry;
+
grpc_call *lb_call; /* streaming call to the LB server, */
grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
@@ -385,8 +390,8 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
static void *lb_token_copy(void *token) {
return token == NULL ? NULL : GRPC_MDELEM_REF(token);
}
-static void lb_token_destroy(void *token) {
- if (token != NULL) GRPC_MDELEM_UNREF(token);
+static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
+ if (token != NULL) GRPC_MDELEM_UNREF(exec_ctx, token);
}
static int lb_token_cmp(void *token1, void *token2) {
if (token1 > token2) return 1;
@@ -420,7 +425,7 @@ static void parse_server(const grpc_grpclb_server *server,
/* Returns addresses extracted from \a serverlist. */
static grpc_lb_addresses *process_serverlist_locked(
- const grpc_grpclb_serverlist *serverlist) {
+ grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
@@ -456,8 +461,8 @@ static grpc_lb_addresses *process_serverlist_locked(
strnlen(server->load_balance_token, lb_token_max_length);
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
(uint8_t *)server->load_balance_token, lb_token_length);
- user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN,
- lb_token_mdstr);
+ user_data = grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
} else {
char *uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@@ -580,7 +585,8 @@ static grpc_lb_policy *create_rr_locked(
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
- grpc_lb_addresses *addresses = process_serverlist_locked(serverlist);
+ grpc_lb_addresses *addresses =
+ process_serverlist_locked(exec_ctx, serverlist);
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
@@ -592,8 +598,8 @@ static grpc_lb_policy *create_rr_locked(
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
GPR_ASSERT(rr != NULL);
- grpc_lb_addresses_destroy(addresses);
- grpc_channel_args_destroy(args.args);
+ grpc_lb_addresses_destroy(exec_ctx, addresses);
+ grpc_channel_args_destroy(exec_ctx, args.args);
return rr;
}
@@ -667,7 +673,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
- rr_connectivity);
+ rr_connectivity, grpc_schedule_on_exec_ctx);
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = new_rr_state;
@@ -839,7 +845,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
exec_ctx, glb_policy->cc_factory, target_uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
- grpc_channel_args_destroy(new_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
gpr_free(target_uri_str);
for (size_t i = 0; i < num_grpclb_addrs; i++) {
@@ -865,7 +871,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GPR_ASSERT(glb_policy->pending_picks == NULL);
GPR_ASSERT(glb_policy->pending_pings == NULL);
gpr_free((void *)glb_policy->server_name);
- grpc_channel_args_destroy(glb_policy->args);
+ grpc_channel_args_destroy(exec_ctx, glb_policy->args);
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
@@ -908,15 +914,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_NONE);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_NONE);
pping = next;
}
}
@@ -932,9 +938,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -957,9 +963,9 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -994,11 +1000,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, on_complete,
GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
- "won't work without it. Failing"),
- NULL);
+ "won't work without it. Failing"));
return 0;
}
@@ -1017,7 +1022,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
- grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
+ grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
+ grpc_schedule_on_exec_ctx);
wc_arg->rr_policy = glb_policy->rr_policy;
wc_arg->target = target;
wc_arg->wrapped_closure = on_complete;
@@ -1088,7 +1094,8 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void lb_call_init_locked(glb_lb_policy *glb_policy) {
+static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
GPR_ASSERT(!glb_policy->shutting_down);
@@ -1097,7 +1104,7 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) {
* glb_policy->base.interested_parties, which is comprised of the polling
* entities from \a client_channel. */
glb_policy->lb_call = grpc_channel_create_pollset_set_call(
- glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
+ exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
"/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name,
glb_policy->deadline, NULL);
@@ -1110,16 +1117,18 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) {
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->lb_request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref(request_payload_slice);
+ grpc_slice_unref_internal(exec_ctx, request_payload_slice);
grpc_grpclb_request_destroy(request);
glb_policy->lb_call_status_details = NULL;
glb_policy->lb_call_status_details_capacity = 0;
grpc_closure_init(&glb_policy->lb_on_server_status_received,
- lb_on_server_status_received, glb_policy);
+ lb_on_server_status_received, glb_policy,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&glb_policy->lb_on_response_received,
- lb_on_response_received, glb_policy);
+ lb_on_response_received, glb_policy,
+ grpc_schedule_on_exec_ctx);
gpr_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
@@ -1149,7 +1158,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(glb_policy->lb_channel != NULL);
if (glb_policy->shutting_down) return;
- lb_call_init_locked(glb_policy);
+ lb_call_init_locked(exec_ctx, glb_policy);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
@@ -1234,7 +1243,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_grpclb_response_parse_serverlist(response_slice);
if (serverlist != NULL) {
GPR_ASSERT(glb_policy->lb_call != NULL);
- grpc_slice_unref(response_slice);
+ grpc_slice_unref_internal(exec_ctx, response_slice);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Serverlist with %lu servers received",
(unsigned long)serverlist->num_servers);
@@ -1278,7 +1287,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
} else { /* serverlist == NULL */
gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
- grpc_slice_unref(response_slice);
+ grpc_slice_unref_internal(exec_ctx, response_slice);
}
if (!glb_policy->shutting_down) {
@@ -1358,8 +1367,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
+ grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer,
+ glb_policy, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
- lb_call_on_retry_timer, glb_policy, now);
+ &glb_policy->lb_on_call_retry, now);
}
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index b9cfe6b5c0..821becff69 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -120,7 +120,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
pp = next;
}
@@ -138,9 +138,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -165,9 +165,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -306,14 +306,15 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_exec_ctx_sched(exec_ctx,
- grpc_closure_create(destroy_subchannels, p),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(destroy_subchannels, p,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -366,8 +367,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
@@ -419,8 +419,7 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"));
}
}
@@ -485,7 +484,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
p->num_subchannels = subchannel_idx;
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
- grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
+ grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p,
+ grpc_schedule_on_exec_ctx);
gpr_mu_init(&p->mu);
return &p->base;
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index f0305473d2..cb679489c3 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -284,7 +284,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(sd->user_data);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
}
gpr_free(sd);
}
@@ -321,8 +321,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Channel Shutdown"), NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Channel Shutdown"));
gpr_free(pp);
}
grpc_connectivity_state_set(
@@ -348,9 +348,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -376,9 +376,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
*pp->target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -581,7 +581,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
(void *)selected->subchannel, (void *)selected);
}
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
update_lb_connectivity_status(exec_ctx, sd, error);
@@ -634,7 +634,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
@@ -684,8 +684,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_sched(exec_ctx, closure,
- GRPC_ERROR_CREATE("Round Robin not connected"), NULL);
+ grpc_closure_sched(exec_ctx, closure,
+ GRPC_ERROR_CREATE("Round Robin not connected"));
}
}
@@ -749,7 +749,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
}
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed, sd);
+ rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx);
}
}
if (subchannel_idx == 0) {
diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/load_reporting/load_reporting.c
index df1ea0ec9a..37b06a737f 100644
--- a/src/core/ext/load_reporting/load_reporting.c
+++ b/src/core/ext/load_reporting/load_reporting.c
@@ -53,7 +53,8 @@ static bool is_load_reporting_enabled(const grpc_channel_args *a) {
return false;
}
-static bool maybe_add_load_reporting_filter(grpc_channel_stack_builder *builder,
+static bool maybe_add_load_reporting_filter(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
void *arg) {
const grpc_channel_args *args =
grpc_channel_stack_builder_get_channel_arguments(builder);
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index 18bb826948..07ef10e6a8 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -68,7 +68,8 @@ typedef struct {
grpc_exec_ctx *exec_ctx;
} recv_md_filter_args;
-static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *recv_md_filter(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem *md) {
recv_md_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
call_data *calld = elem->call_data;
@@ -92,8 +93,8 @@ static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
recv_md_filter_args a;
a.elem = elem;
a.exec_ctx = exec_ctx;
- grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter,
- &a);
+ grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata,
+ recv_md_filter, &a);
if (calld->service_method == NULL) {
err =
grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header"));
@@ -114,7 +115,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
memset(calld, 0, sizeof(call_data));
calld->id = (intptr_t)args->call_stack;
- grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem);
+ grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem,
+ grpc_schedule_on_exec_ctx);
/* TODO(dgq): do something with the data
channel_data *chand = elem->channel_data;
@@ -191,7 +193,8 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
*/
}
-static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *lr_trailing_md_filter(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
@@ -215,7 +218,7 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->on_initial_md_ready;
} else if (op->send_trailing_metadata) {
- grpc_metadata_batch_filter(op->send_trailing_metadata,
+ grpc_metadata_batch_filter(exec_ctx, op->send_trailing_metadata,
lr_trailing_md_filter, elem);
}
grpc_call_next_op(exec_ctx, elem, op);
diff --git a/src/core/ext/resolver/README.md b/src/core/ext/resolver/README.md
new file mode 100644
index 0000000000..b0e234e96a
--- /dev/null
+++ b/src/core/ext/resolver/README.md
@@ -0,0 +1,4 @@
+# Resolver
+
+Implementations of various name resolution schemes.
+See the [naming spec](/doc/naming.md).
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 2675fa931f..bb2b012507 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -81,6 +81,7 @@ typedef struct {
/** retry timer */
bool have_retry_timer;
grpc_timer retry_timer;
+ grpc_closure on_retry;
/** retry backoff state */
gpr_backoff backoff_state;
@@ -112,8 +113,8 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
}
if (r->next_completion != NULL) {
*r->target_result = NULL;
- grpc_exec_ctx_sched(exec_ctx, r->next_completion,
- GRPC_ERROR_CREATE("Resolver Shutdown"), NULL);
+ grpc_closure_sched(exec_ctx, r->next_completion,
+ GRPC_ERROR_CREATE("Resolver Shutdown"));
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -182,7 +183,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses);
result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1);
grpc_resolved_addresses_destroy(r->addresses);
- grpc_lb_addresses_destroy(addresses);
+ grpc_lb_addresses_destroy(exec_ctx, addresses);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
@@ -199,11 +200,12 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
} else {
gpr_log(GPR_DEBUG, "retrying immediately");
}
- grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r,
- now);
+ grpc_closure_init(&r->on_retry, dns_on_retry_timer, r,
+ grpc_schedule_on_exec_ctx);
+ grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now);
}
if (r->resolved_result != NULL) {
- grpc_channel_args_destroy(r->resolved_result);
+ grpc_channel_args_destroy(exec_ctx, r->resolved_result);
}
r->resolved_result = result;
r->resolved_version++;
@@ -219,9 +221,10 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!r->resolving);
r->resolving = true;
r->addresses = NULL;
- grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port,
- r->interested_parties,
- grpc_closure_create(dns_on_resolved, r), &r->addresses);
+ grpc_resolve_address(
+ exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
+ grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx),
+ &r->addresses);
}
static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
@@ -231,7 +234,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
*r->target_result = r->resolved_result == NULL
? NULL
: grpc_channel_args_copy(r->resolved_result);
- grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
@@ -241,12 +244,12 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
dns_resolver *r = (dns_resolver *)gr;
gpr_mu_destroy(&r->mu);
if (r->resolved_result != NULL) {
- grpc_channel_args_destroy(r->resolved_result);
+ grpc_channel_args_destroy(exec_ctx, r->resolved_result);
}
grpc_pollset_set_destroy(r->interested_parties);
gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
- grpc_channel_args_destroy(r->channel_args);
+ grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r);
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 88808c674f..c146a627cb 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -47,6 +47,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@@ -89,7 +90,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_result = NULL;
- grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -123,7 +124,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_arg arg = grpc_lb_addresses_create_channel_arg(r->addresses);
*r->target_result =
grpc_channel_args_copy_and_add(r->channel_args, &arg, 1);
- grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
}
}
@@ -131,8 +132,8 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- grpc_lb_addresses_destroy(r->addresses);
- grpc_channel_args_destroy(r->channel_args);
+ grpc_lb_addresses_destroy(exec_ctx, r->addresses);
+ grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r);
}
@@ -161,7 +162,8 @@ char *unix_get_default_authority(grpc_resolver_factory *factory,
static void do_nothing(void *ignored) {}
-static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
+static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
+ grpc_resolver_args *args,
int parse(grpc_uri *uri,
grpc_resolved_address *dst)) {
if (0 != strcmp(args->uri->authority, "")) {
@@ -188,10 +190,10 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
gpr_free(part_str);
if (errors_found) break;
}
- grpc_slice_buffer_destroy(&path_parts);
- grpc_slice_unref(path_slice);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &path_parts);
+ grpc_slice_unref_internal(exec_ctx, path_slice);
if (errors_found) {
- grpc_lb_addresses_destroy(addresses);
+ grpc_lb_addresses_destroy(exec_ctx, addresses);
return NULL;
}
/* Instantiate resolver. */
@@ -216,7 +218,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *name##_factory_create_resolver( \
grpc_exec_ctx *exec_ctx, grpc_resolver_factory *factory, \
grpc_resolver_args *args) { \
- return sockaddr_create(args, parse_##name); \
+ return sockaddr_create(exec_ctx, args, parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \
diff --git a/src/core/ext/transport/README.md b/src/core/ext/transport/README.md
new file mode 100644
index 0000000000..2290568784
--- /dev/null
+++ b/src/core/ext/transport/README.md
@@ -0,0 +1 @@
+Transports for gRPC
diff --git a/src/core/ext/transport/chttp2/README.md b/src/core/ext/transport/chttp2/README.md
new file mode 100644
index 0000000000..8880a47460
--- /dev/null
+++ b/src/core/ext/transport/chttp2/README.md
@@ -0,0 +1 @@
+CHTTP2 - gRPC's implementation of a HTTP2 based transport
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index 114bb07222..2c5dfaea60 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -46,8 +46,9 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/iomgr/tcp_client.h"
-#include "src/core/lib/security/transport/security_connector.h"
+#include "src/core/lib/slice/slice_internal.h"
typedef struct {
grpc_connector base;
@@ -58,9 +59,6 @@ typedef struct {
bool shutdown;
bool connecting;
- grpc_chttp2_add_handshakers_func add_handshakers;
- void *add_handshakers_user_data;
-
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
@@ -124,8 +122,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, args->endpoint);
grpc_endpoint_destroy(exec_ctx, args->endpoint);
- grpc_channel_args_destroy(args->args);
- grpc_slice_buffer_destroy(args->read_buffer);
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
gpr_free(args->read_buffer);
} else {
error = GRPC_ERROR_REF(error);
@@ -141,7 +139,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_closure *notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ grpc_closure_sched(exec_ctx, notify, error);
grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
c->handshake_mgr = NULL;
gpr_mu_unlock(&c->mu);
@@ -151,16 +149,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
static void start_handshake_locked(grpc_exec_ctx *exec_ctx,
chttp2_connector *c) {
c->handshake_mgr = grpc_handshake_manager_create();
- char *proxy_name = grpc_get_http_proxy_server();
- if (proxy_name != NULL) {
- grpc_handshake_manager_add(c->handshake_mgr,
- grpc_http_connect_handshaker_create(proxy_name));
- gpr_free(proxy_name);
- }
- if (c->add_handshakers != NULL) {
- c->add_handshakers(exec_ctx, c->add_handshakers_user_data,
+ grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args,
c->handshake_mgr);
- }
grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
@@ -180,7 +170,7 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
memset(c->result, 0, sizeof(*c->result));
grpc_closure *notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ grpc_closure_sched(exec_ctx, notify, error);
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg);
} else {
@@ -203,7 +193,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
memset(c->result, 0, sizeof(*c->result));
grpc_closure *notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ grpc_closure_sched(exec_ctx, notify, error);
if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg);
@@ -211,7 +201,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GPR_ASSERT(c->endpoint != NULL);
if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
- c);
+ c, grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&c->initial_string_buffer);
grpc_slice_buffer_add(&c->initial_string_buffer,
c->args.initial_connect_string);
@@ -237,7 +227,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
c->result = result;
GPR_ASSERT(c->endpoint == NULL);
chttp2_connector_ref(con); // Ref taken for callback.
- grpc_closure_init(&c->connected, connected, c);
+ grpc_closure_init(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
GPR_ASSERT(!c->connecting);
c->connecting = true;
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
@@ -250,15 +240,11 @@ static const grpc_connector_vtable chttp2_connector_vtable = {
chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown,
chttp2_connector_connect};
-grpc_connector *grpc_chttp2_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_add_handshakers_func add_handshakers,
- void *add_handshakers_user_data) {
+grpc_connector *grpc_chttp2_connector_create() {
chttp2_connector *c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c));
c->base.vtable = &chttp2_connector_vtable;
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
- c->add_handshakers = add_handshakers;
- c->add_handshakers_user_data = add_handshakers_user_data;
return &c->base;
}
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h
index 58eba22417..f5d1025432 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.h
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h
@@ -35,17 +35,7 @@
#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
#include "src/core/ext/client_channel/connector.h"
-#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-typedef void (*grpc_chttp2_add_handshakers_func)(
- grpc_exec_ctx* exec_ctx, void* user_data,
- grpc_handshake_manager* handshake_mgr);
-
-/// If \a add_handshakers is non-NULL, it will be called with
-/// \a add_handshakers_user_data to add handshakers.
-grpc_connector* grpc_chttp2_connector_create(
- grpc_exec_ctx* exec_ctx, grpc_chttp2_add_handshakers_func add_handshakers,
- void* add_handshakers_user_data);
+grpc_connector* grpc_chttp2_connector_create();
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index a0d0652ce7..c9f4021216 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -53,8 +53,7 @@ static void client_channel_factory_unref(
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
- grpc_connector *connector = grpc_chttp2_connector_create(
- exec_ctx, NULL /* add_handshakers */, NULL /* user_data */);
+ grpc_connector *connector = grpc_chttp2_connector_create();
grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
grpc_connector_unref(exec_ctx, connector);
return s;
@@ -72,7 +71,7 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
- grpc_channel_args_destroy(new_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
return channel;
}
@@ -96,17 +95,16 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
"grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
(target, args, reserved));
GPR_ASSERT(reserved == NULL);
- grpc_client_channel_factory *factory =
- (grpc_client_channel_factory *)&client_channel_factory;
// Add channel arg containing the client channel factory.
- grpc_arg arg = grpc_client_channel_factory_create_channel_arg(factory);
+ grpc_arg arg =
+ grpc_client_channel_factory_create_channel_arg(&client_channel_factory);
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
// Create channel.
grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
+ &exec_ctx, &client_channel_factory, target,
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
// Clean up.
- grpc_channel_args_destroy(new_args);
- grpc_client_channel_factory_unref(&exec_ctx, factory);
+ grpc_channel_args_destroy(&exec_ctx, new_args);
grpc_exec_ctx_finish(&exec_ctx);
return channel != NULL ? channel : grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL,
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
index 1e5b1c22e3..f1069e2c57 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
@@ -74,7 +74,7 @@ grpc_channel *grpc_insecure_channel_create_from_fd(
GPR_ASSERT(transport);
grpc_channel *channel = grpc_channel_create(
&exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
- grpc_channel_args_destroy(final_args);
+ grpc_channel_args_destroy(&exec_ctx, final_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index f35439cd44..f979d9bad5 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -46,40 +46,16 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
-typedef struct {
- grpc_client_channel_factory base;
- gpr_refcount refs;
- grpc_channel_security_connector *security_connector;
-} client_channel_factory;
-
static void client_channel_factory_ref(
- grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- gpr_ref(&f->refs);
-}
+ grpc_client_channel_factory *cc_factory) {}
static void client_channel_factory_unref(
- grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- if (gpr_unref(&f->refs)) {
- GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
- "client_channel_factory");
- gpr_free(f);
- }
-}
-
-static void add_handshakers(grpc_exec_ctx *exec_ctx, void *security_connector,
- grpc_handshake_manager *handshake_mgr) {
- grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector,
- handshake_mgr);
-}
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- grpc_connector *connector = grpc_chttp2_connector_create(
- exec_ctx, add_handshakers, f->security_connector);
+ grpc_connector *connector = grpc_chttp2_connector_create();
grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
grpc_connector_unref(exec_ctx, connector);
return s;
@@ -97,7 +73,7 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
- grpc_channel_args_destroy(new_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
return channel;
}
@@ -106,6 +82,9 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
+static grpc_client_channel_factory client_channel_factory = {
+ &client_channel_factory_vtable};
+
/* Create a secure client channel:
Asynchronously: - resolve target
- connect to it (trying alternatives as presented)
@@ -132,39 +111,32 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
grpc_channel_security_connector *security_connector;
grpc_channel_args *new_args_from_connector;
if (grpc_channel_credentials_create_security_connector(
- creds, target, args, &security_connector, &new_args_from_connector) !=
- GRPC_SECURITY_OK) {
+ &exec_ctx, creds, target, args, &security_connector,
+ &new_args_from_connector) != GRPC_SECURITY_OK) {
grpc_exec_ctx_finish(&exec_ctx);
return grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL, "Failed to create security connector.");
}
- // Create client channel factory.
- client_channel_factory *f = gpr_malloc(sizeof(*f));
- memset(f, 0, sizeof(*f));
- f->base.vtable = &client_channel_factory_vtable;
- gpr_ref_init(&f->refs, 1);
- GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
- "grpc_secure_channel_create");
- f->security_connector = security_connector;
// Add channel args containing the client channel factory and security
// connector.
- grpc_arg new_args[2];
- new_args[0] = grpc_client_channel_factory_create_channel_arg(&f->base);
- new_args[1] = grpc_security_connector_to_arg(&security_connector->base);
- grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(
+ grpc_arg args_to_add[2];
+ args_to_add[0] =
+ grpc_client_channel_factory_create_channel_arg(&client_channel_factory);
+ args_to_add[1] = grpc_security_connector_to_arg(&security_connector->base);
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
- new_args, GPR_ARRAY_SIZE(new_args));
+ args_to_add, GPR_ARRAY_SIZE(args_to_add));
if (new_args_from_connector != NULL) {
- grpc_channel_args_destroy(new_args_from_connector);
+ grpc_channel_args_destroy(&exec_ctx, new_args_from_connector);
}
// Create channel.
grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args_copy);
+ &exec_ctx, &client_channel_factory, target,
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
// Clean up.
- GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
+ GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &security_connector->base,
"secure_client_channel_factory_create_channel");
- grpc_channel_args_destroy(args_copy);
- grpc_client_channel_factory_unref(&exec_ctx, &f->base);
+ grpc_channel_args_destroy(&exec_ctx, new_args);
grpc_exec_ctx_finish(&exec_ctx);
return channel; /* may be NULL */
}
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index f0857714fc..574d1a7710 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -46,31 +46,15 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-void grpc_chttp2_server_handshaker_factory_add_handshakers(
- grpc_exec_ctx *exec_ctx,
- grpc_chttp2_server_handshaker_factory *handshaker_factory,
- grpc_handshake_manager *handshake_mgr) {
- if (handshaker_factory != NULL) {
- handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory,
- handshake_mgr);
- }
-}
-
-void grpc_chttp2_server_handshaker_factory_destroy(
- grpc_exec_ctx *exec_ctx,
- grpc_chttp2_server_handshaker_factory *handshaker_factory) {
- if (handshaker_factory != NULL) {
- handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory);
- }
-}
-
typedef struct pending_handshake_manager_node {
grpc_handshake_manager *handshake_mgr;
struct pending_handshake_manager_node *next;
@@ -80,7 +64,6 @@ typedef struct {
grpc_server *server;
grpc_tcp_server *tcp_server;
grpc_channel_args *args;
- grpc_chttp2_server_handshaker_factory *handshaker_factory;
gpr_mu mu;
bool shutdown;
grpc_closure tcp_server_shutdown_complete;
@@ -148,8 +131,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, args->endpoint);
grpc_endpoint_destroy(exec_ctx, args->endpoint);
- grpc_channel_args_destroy(args->args);
- grpc_slice_buffer_destroy(args->read_buffer);
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
gpr_free(args->read_buffer);
}
} else {
@@ -164,7 +147,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
connection_state->accepting_pollset, args->args);
grpc_chttp2_transport_start_reading(exec_ctx, transport,
args->read_buffer);
- grpc_channel_args_destroy(args->args);
+ grpc_channel_args_destroy(exec_ctx, args->args);
}
}
pending_handshake_manager_remove_locked(connection_state->server_state,
@@ -197,8 +180,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr;
- grpc_chttp2_server_handshaker_factory_add_handshakers(
- exec_ctx, state->handshaker_factory, connection_state->handshake_mgr);
+ grpc_handshakers_add(exec_ctx, HANDSHAKER_SERVER, state->args,
+ connection_state->handshake_mgr);
// TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it.
const gpr_timespec deadline = gpr_time_add(
@@ -232,13 +215,11 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
grpc_exec_ctx_flush(exec_ctx);
- grpc_chttp2_server_handshaker_factory_destroy(exec_ctx,
- state->handshaker_factory);
if (destroy_done != NULL) {
destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error));
grpc_exec_ctx_flush(exec_ctx);
}
- grpc_channel_args_destroy(state->args);
+ grpc_channel_args_destroy(exec_ctx, state->args);
gpr_mu_destroy(&state->mu);
gpr_free(state);
}
@@ -258,10 +239,10 @@ static void server_destroy_listener(grpc_exec_ctx *exec_ctx,
grpc_tcp_server_unref(exec_ctx, tcp_server);
}
-grpc_error *grpc_chttp2_server_add_port(
- grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr,
- grpc_channel_args *args,
- grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num) {
+grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx,
+ grpc_server *server, const char *addr,
+ grpc_channel_args *args,
+ int *port_num) {
grpc_resolved_addresses *resolved = NULL;
grpc_tcp_server *tcp_server = NULL;
size_t i;
@@ -281,7 +262,8 @@ grpc_error *grpc_chttp2_server_add_port(
state = gpr_malloc(sizeof(*state));
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->tcp_server_shutdown_complete,
- tcp_server_shutdown_complete, state);
+ tcp_server_shutdown_complete, state,
+ grpc_schedule_on_exec_ctx);
err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete,
args, &tcp_server);
if (err != GRPC_ERROR_NONE) {
@@ -291,7 +273,6 @@ grpc_error *grpc_chttp2_server_add_port(
state->server = server;
state->tcp_server = tcp_server;
state->args = args;
- state->handshaker_factory = handshaker_factory;
state->shutdown = true;
gpr_mu_init(&state->mu);
@@ -345,8 +326,7 @@ error:
if (tcp_server) {
grpc_tcp_server_unref(exec_ctx, tcp_server);
} else {
- grpc_channel_args_destroy(args);
- grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, handshaker_factory);
+ grpc_channel_args_destroy(exec_ctx, args);
gpr_free(state);
}
*port_num = 0;
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h
index aa364b565d..2581ebaae9 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.h
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.h
@@ -36,43 +36,12 @@
#include <grpc/impl/codegen/grpc_types.h>
-#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/exec_ctx.h"
-/// A server handshaker factory is used to create handshakers for server
-/// connections.
-typedef struct grpc_chttp2_server_handshaker_factory
- grpc_chttp2_server_handshaker_factory;
-
-typedef struct {
- void (*add_handshakers)(
- grpc_exec_ctx *exec_ctx,
- grpc_chttp2_server_handshaker_factory *handshaker_factory,
- grpc_handshake_manager *handshake_mgr);
- void (*destroy)(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_server_handshaker_factory *handshaker_factory);
-} grpc_chttp2_server_handshaker_factory_vtable;
-
-struct grpc_chttp2_server_handshaker_factory {
- const grpc_chttp2_server_handshaker_factory_vtable *vtable;
-};
-
-void grpc_chttp2_server_handshaker_factory_add_handshakers(
- grpc_exec_ctx *exec_ctx,
- grpc_chttp2_server_handshaker_factory *handshaker_factory,
- grpc_handshake_manager *handshake_mgr);
-
-void grpc_chttp2_server_handshaker_factory_destroy(
- grpc_exec_ctx *exec_ctx,
- grpc_chttp2_server_handshaker_factory *handshaker_factory);
-
/// Adds a port to \a server. Sets \a port_num to the port number.
-/// If \a handshaker_factory is not NULL, it will be used to create
-/// handshakers for the port.
-/// Takes ownership of \a args and \a handshaker_factory.
-grpc_error *grpc_chttp2_server_add_port(
- grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr,
- grpc_channel_args *args,
- grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num);
+/// Takes ownership of \a args.
+grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx,
+ grpc_server *server, const char *addr,
+ grpc_channel_args *args, int *port_num);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 7e286d4e46..bf5026bea6 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -47,8 +47,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
(server, addr));
grpc_error *err = grpc_chttp2_server_add_port(
&exec_ctx, server, addr,
- grpc_channel_args_copy(grpc_server_get_channel_args(server)),
- NULL /* handshaker_factory */, &port_num);
+ grpc_channel_args_copy(grpc_server_get_channel_args(server)), &port_num);
if (err != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "%s", msg);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index aa2ecf5743..f46e849932 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -62,7 +62,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
grpc_endpoint *server_endpoint =
grpc_tcp_create(grpc_fd_create(fd, name), resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
- grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
gpr_free(name);
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index a33a7a3f7d..395c79a71d 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -49,34 +49,6 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-typedef struct {
- grpc_chttp2_server_handshaker_factory base;
- grpc_server_security_connector *security_connector;
-} server_security_handshaker_factory;
-
-static void server_security_handshaker_factory_add_handshakers(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf,
- grpc_handshake_manager *handshake_mgr) {
- server_security_handshaker_factory *handshaker_factory =
- (server_security_handshaker_factory *)hf;
- grpc_server_security_connector_add_handshakers(
- exec_ctx, handshaker_factory->security_connector, handshake_mgr);
-}
-
-static void server_security_handshaker_factory_destroy(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf) {
- server_security_handshaker_factory *handshaker_factory =
- (server_security_handshaker_factory *)hf;
- GRPC_SECURITY_CONNECTOR_UNREF(&handshaker_factory->security_connector->base,
- "server");
- gpr_free(hf);
-}
-
-static const grpc_chttp2_server_handshaker_factory_vtable
- server_security_handshaker_factory_vtable = {
- server_security_handshaker_factory_add_handshakers,
- server_security_handshaker_factory_destroy};
-
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_server_credentials *creds) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -94,7 +66,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
goto done;
}
grpc_security_status status =
- grpc_server_credentials_create_security_connector(creds, &sc);
+ grpc_server_credentials_create_security_connector(&exec_ctx, creds, &sc);
if (status != GRPC_SECURITY_OK) {
char *msg;
gpr_asprintf(&msg,
@@ -105,20 +77,19 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
gpr_free(msg);
goto done;
}
- // Create handshaker factory.
- server_security_handshaker_factory *handshaker_factory =
- gpr_malloc(sizeof(*handshaker_factory));
- memset(handshaker_factory, 0, sizeof(*handshaker_factory));
- handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable;
- handshaker_factory->security_connector = sc;
// Create channel args.
- grpc_arg channel_arg = grpc_server_credentials_to_arg(creds);
- grpc_channel_args *args = grpc_channel_args_copy_and_add(
- grpc_server_get_channel_args(server), &channel_arg, 1);
+ grpc_arg args_to_add[2];
+ args_to_add[0] = grpc_server_credentials_to_arg(creds);
+ args_to_add[1] = grpc_security_connector_to_arg(&sc->base);
+ grpc_channel_args *args =
+ grpc_channel_args_copy_and_add(grpc_server_get_channel_args(server),
+ args_to_add, GPR_ARRAY_SIZE(args_to_add));
// Add server port.
- err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args,
- &handshaker_factory->base, &port_num);
+ err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, &port_num);
done:
+ if (sc != NULL) {
+ GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &sc->base, "server");
+ }
grpc_exec_ctx_finish(&exec_ctx);
if (err != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(err);
diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c
index 3eef80b557..8db36e4a7f 100644
--- a/src/core/ext/transport/chttp2/transport/bin_decoder.c
+++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c
@@ -34,6 +34,7 @@
#include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@@ -143,7 +144,8 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx) {
return true;
}
-grpc_slice grpc_chttp2_base64_decode(grpc_slice input) {
+grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx,
+ grpc_slice input) {
size_t input_length = GRPC_SLICE_LENGTH(input);
size_t output_length = input_length / 4 * 3;
struct grpc_base64_decode_context ctx;
@@ -179,7 +181,7 @@ grpc_slice grpc_chttp2_base64_decode(grpc_slice input) {
char *s = grpc_dump_slice(input, GPR_DUMP_ASCII);
gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s);
gpr_free(s);
- grpc_slice_unref(output);
+ grpc_slice_unref_internal(exec_ctx, output);
return gpr_empty_slice();
}
GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output));
@@ -187,7 +189,8 @@ grpc_slice grpc_chttp2_base64_decode(grpc_slice input) {
return output;
}
-grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
+grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx,
+ grpc_slice input,
size_t output_length) {
size_t input_length = GRPC_SLICE_LENGTH(input);
grpc_slice output = grpc_slice_malloc(output_length);
@@ -200,7 +203,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
"grpc_chttp2_base64_decode_with_length has a length of %d, which "
"has a tail of 1 byte.\n",
(int)input_length);
- grpc_slice_unref(output);
+ grpc_slice_unref_internal(exec_ctx, output);
return gpr_empty_slice();
}
@@ -210,7 +213,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
"than the max possible output length %d.\n",
(int)output_length,
(int)(input_length / 4 * 3 + tail_xtra[input_length % 4]));
- grpc_slice_unref(output);
+ grpc_slice_unref_internal(exec_ctx, output);
return gpr_empty_slice();
}
@@ -224,7 +227,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
char *s = grpc_dump_slice(input, GPR_DUMP_ASCII);
gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s);
gpr_free(s);
- grpc_slice_unref(output);
+ grpc_slice_unref_internal(exec_ctx, output);
return gpr_empty_slice();
}
GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output));
diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h
index 83a90be519..48ffb2ae3b 100644
--- a/src/core/ext/transport/chttp2/transport/bin_decoder.h
+++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h
@@ -55,12 +55,13 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx);
/* base64 decode a slice with pad chars. Returns a new slice, does not take
ownership of the input. Returns an empty slice if decoding is failed. */
-grpc_slice grpc_chttp2_base64_decode(grpc_slice input);
+grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx, grpc_slice input);
/* base64 decode a slice without pad chars, data length is needed. Returns a new
slice, does not take ownership of the input. Returns an empty slice if
decoding is failed. */
-grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
+grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx,
+ grpc_slice input,
size_t output_length);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */
diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.h b/src/core/ext/transport/chttp2/transport/bin_encoder.h
index 9e143b46e2..477559d0e2 100644
--- a/src/core/ext/transport/chttp2/transport/bin_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/bin_encoder.h
@@ -47,7 +47,7 @@ grpc_slice grpc_chttp2_huffman_compress(grpc_slice input);
/* equivalent to:
grpc_slice x = grpc_chttp2_base64_encode(input);
grpc_slice y = grpc_chttp2_huffman_compress(x);
- grpc_slice_unref(x);
+ grpc_slice_unref_internal(exec_ctx, x);
return y; */
grpc_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(
grpc_slice input);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 6bc054866b..8fe12a2690 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -47,10 +47,12 @@
#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
+#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -73,20 +75,14 @@ static const grpc_transport_vtable vtable;
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void write_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
-static void write_action_end(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
-static void read_action_begin(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_error *error);
-static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
- grpc_error *error);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id id, uint32_t value);
@@ -112,12 +108,8 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
-static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
-static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -141,13 +133,13 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_endpoint_destroy(exec_ctx, t->ep);
- grpc_slice_buffer_destroy(&t->qbuf);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->qbuf);
- grpc_slice_buffer_destroy(&t->outbuf);
- grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->outbuf);
+ grpc_chttp2_hpack_compressor_destroy(exec_ctx, &t->hpack_compressor);
- grpc_slice_buffer_destroy(&t->read_buffer);
- grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->read_buffer);
+ grpc_chttp2_hpack_parser_destroy(exec_ctx, &t->hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
@@ -166,8 +158,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
and maybe they hold resources that need to be freed */
while (t->pings.next != &t->pings) {
grpc_chttp2_outstanding_ping *ping = t->pings.next;
- grpc_exec_ctx_sched(exec_ctx, ping->on_recv,
- GRPC_ERROR_CREATE("Transport closed"), NULL);
+ grpc_closure_sched(exec_ctx, ping->on_recv,
+ GRPC_ERROR_CREATE("Transport closed"));
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -246,21 +238,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_slice_buffer_init(&t->outbuf);
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
- grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked,
- t);
- grpc_closure_init(&t->write_action, write_action, t);
- grpc_closure_init(&t->write_action_end, write_action_end, t);
- grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t);
- grpc_closure_init(&t->read_action_begin, read_action_begin, t);
- grpc_closure_init(&t->read_action_locked, read_action_locked, t);
- grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
- grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
- grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t);
+ grpc_closure_init(&t->write_action, write_action, t,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&t->read_action_locked, read_action_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->destructive_reclaimer_locked,
- destructive_reclaimer_locked, t);
+ destructive_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- grpc_chttp2_hpack_parser_init(&t->hpack_parser);
+ grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
grpc_slice_buffer_init(&t->read_buffer);
@@ -395,9 +384,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_combiner_execute(exec_ctx, t->combiner,
- grpc_closure_create(destroy_transport_locked, t),
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(
+ destroy_transport_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
}
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
@@ -471,8 +461,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- grpc_closure_init(&s->complete_fetch, complete_fetch, s);
- grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s);
+ grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
+ grpc_schedule_on_exec_ctx);
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -527,9 +517,11 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(s->recv_message_ready == NULL);
GPR_ASSERT(s->recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser);
- grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]);
- grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]);
- grpc_slice_buffer_destroy(&s->flow_controlled_buffer);
+ grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
+ &s->metadata_buffer[0]);
+ grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
+ &s->metadata_buffer[1]);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
@@ -547,9 +539,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
s->destroy_stream_arg = and_free_memory;
- grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
- grpc_combiner_execute(exec_ctx, t->combiner, &s->destroy_stream,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("destroy_stream", 0);
}
@@ -600,7 +593,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name(st), reason));
t->write_state = st;
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL);
+ grpc_closure_list_sched(exec_ctx, &t->run_after_write);
if (t->close_transport_on_writes_finished != NULL) {
grpc_error *err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = NULL;
@@ -618,9 +611,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, covered_by_poller);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &t->write_action_begin_locked, write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)),
+ GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
set_write_state(
@@ -662,7 +658,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"begin writing");
- grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
} else {
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
"begin writing nothing");
@@ -674,19 +670,13 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("write_action", 0);
- grpc_endpoint_write(exec_ctx, t->ep, &t->outbuf, &t->write_action_end);
+ grpc_endpoint_write(
+ exec_ctx, t->ep, &t->outbuf,
+ grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)));
GPR_TIMER_END("write_action", 0);
}
-static void write_action_end(grpc_exec_ctx *exec_ctx, void *gt,
- grpc_error *error) {
- grpc_chttp2_transport *t = gt;
- GPR_TIMER_BEGIN("write_action_end", 0);
- grpc_combiner_execute(exec_ctx, t->combiner, &t->write_action_end_locked,
- GRPC_ERROR_REF(error), false);
- GPR_TIMER_END("write_action_end", 0);
-}
-
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
@@ -716,18 +706,24 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [!covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, false);
+ grpc_closure_run(
+ exec_ctx,
+ grpc_closure_init(
+ &t->write_action_begin_locked, write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, true);
+ grpc_closure_run(
+ exec_ctx,
+ grpc_closure_init(&t->write_action_begin_locked,
+ write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, true)),
+ GRPC_ERROR_NONE);
break;
}
@@ -760,7 +756,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
char *msg = grpc_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
- grpc_slice_unref(goaway_text);
+ grpc_slice_unref_internal(exec_ctx, goaway_text);
t->seen_goaway = 1;
/* lie: use transient failure from the transport to indicate goaway has been
* received */
@@ -965,15 +961,6 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
}
}
-static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
- grpc_error *error) {
- grpc_chttp2_stream *s = gs;
- grpc_chttp2_transport *t = s->t;
- grpc_combiner_execute(exec_ctx, t->combiner, &s->complete_fetch_locked,
- GRPC_ERROR_REF(error),
- s->complete_fetch_covered_by_poller);
-}
-
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
@@ -1009,7 +996,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
- on_complete = grpc_closure_create(do_nothing, NULL);
+ on_complete =
+ grpc_closure_create(do_nothing, NULL, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@@ -1212,13 +1200,15 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free(str);
}
- grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
- op);
op->transport_private.args[0] = gt;
op->transport_private.args[1] = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
- grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure,
- GRPC_ERROR_NONE, op->covered_by_poller);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &op->transport_private.closure, perform_stream_op_locked, op,
+ grpc_combiner_scheduler(t->combiner, op->covered_by_poller)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op", 0);
}
@@ -1247,7 +1237,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_outstanding_ping *ping;
for (ping = t->pings.next; ping != &t->pings; ping = ping->next) {
if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -1285,7 +1275,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (op->send_goaway) {
send_goaway(exec_ctx, t,
grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
- grpc_slice_ref(*op->goaway_message));
+ grpc_slice_ref_internal(*op->goaway_message));
}
if (op->set_accept_stream) {
@@ -1321,11 +1311,12 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
char *msg = grpc_transport_op_string(op);
gpr_free(msg);
op->transport_private.args[0] = gt;
- grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
- op);
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
- grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_init(&op->transport_private.closure,
+ perform_transport_op_locked, op,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1515,21 +1506,22 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
grpc_chttp2_incoming_metadata_buffer_add(
- &s->metadata_buffer[1],
- grpc_mdelem_from_metadata_strings(
- GRPC_MDSTR_GRPC_STATUS, grpc_mdstr_from_string(status_string)));
+ &s->metadata_buffer[1], grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_GRPC_STATUS,
+ grpc_mdstr_from_string(status_string)));
if (slice) {
grpc_chttp2_incoming_metadata_buffer_add(
&s->metadata_buffer[1],
grpc_mdelem_from_metadata_strings(
- GRPC_MDSTR_GRPC_MESSAGE,
- grpc_mdstr_from_slice(grpc_slice_ref(*slice))));
+ exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_mdstr_from_slice(exec_ctx,
+ grpc_slice_ref_internal(*slice))));
}
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
if (slice) {
- grpc_slice_unref(*slice);
+ grpc_slice_unref_internal(exec_ctx, *slice);
}
}
@@ -1685,8 +1677,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (optional_message != NULL) {
size_t msg_len = strlen(optional_message);
- GPR_ASSERT(msg_len < 127);
- message_pfx = grpc_slice_malloc(15);
+ GPR_ASSERT(msg_len <= UINT32_MAX);
+ uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 0);
+ message_pfx = grpc_slice_malloc(14 + msg_len_len);
p = GRPC_SLICE_START_PTR(message_pfx);
*p++ = 0x40;
*p++ = 12; /* len(grpc-message) */
@@ -1702,7 +1695,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
*p++ = 'a';
*p++ = 'g';
*p++ = 'e';
- *p++ = (uint8_t)msg_len;
+ GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 0, 0, p,
+ (uint32_t)msg_len_len);
+ p += msg_len_len;
GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx);
len += (uint32_t)msg_len;
@@ -1801,19 +1796,6 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
-static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- /* Control flow:
- reading_action_locked ->
- (parse_unlocked -> post_parse_locked)? ->
- post_reading_action_locked */
- GPR_TIMER_BEGIN("reading_action", 0);
- grpc_chttp2_transport *t = tp;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->read_action_locked,
- GRPC_ERROR_REF(error), false);
- GPR_TIMER_END("reading_action", 0);
-}
-
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_http_parser parser;
@@ -1910,10 +1892,11 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
keep_reading = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
}
- grpc_slice_buffer_reset_and_unref(&t->read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer);
if (keep_reading) {
- grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin);
+ grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
+ &t->read_action_locked);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -1964,7 +1947,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
- grpc_slice_buffer_destroy(&bs->slices);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
@@ -2050,10 +2033,12 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
bs->next_action.slice = slice;
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
- grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->combiner,
- &bs->next_action.closure, GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return 0;
}
@@ -2075,10 +2060,12 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->combiner, &bs->destroy_action,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->destroy_action, incoming_byte_stream_destroy_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
@@ -2086,7 +2073,7 @@ static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
bs->on_next = NULL;
GRPC_ERROR_UNREF(bs->error);
bs->error = error;
@@ -2103,7 +2090,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
if (bs->on_next != NULL) {
*bs->next = slice;
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
bs->on_next = NULL;
} else {
grpc_slice_buffer_add(&bs->slices, slice);
@@ -2171,7 +2158,7 @@ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
grpc_resource_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_resource_user(t->ep),
- false, &t->benign_reclaimer);
+ false, &t->benign_reclaimer_locked);
}
}
@@ -2182,24 +2169,10 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
grpc_resource_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_resource_user(t->ep),
- true, &t->destructive_reclaimer);
+ true, &t->destructive_reclaimer_locked);
}
}
-static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_transport *t = arg;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked,
- GRPC_ERROR_REF(error), false);
-}
-
-static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_transport *t = arg;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked,
- GRPC_ERROR_REF(error), false);
-}
-
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
@@ -2380,5 +2353,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
- read_action_begin(exec_ctx, t, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
index eb68fe3138..49a8326f62 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
@@ -48,6 +48,7 @@
#include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
#include "src/core/ext/transport/chttp2/transport/hpack_table.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/timeout_encoding.h"
@@ -183,7 +184,8 @@ static void evict_entry(grpc_chttp2_hpack_compressor *c) {
}
/* add an element to the decoder table */
-static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
+static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem *elem) {
uint32_t key_hash = elem->key->hash;
uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
uint32_t new_index = c->tail_remote_index + c->table_elems + 1;
@@ -227,12 +229,12 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
} else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] <
c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) {
/* not there: replace oldest */
- GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
+ GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
} else {
/* not there: replace oldest */
- GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
+ GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
}
@@ -251,11 +253,11 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
} else if (c->indices_keys[HASH_FRAGMENT_2(key_hash)] <
c->indices_keys[HASH_FRAGMENT_3(key_hash)]) {
- GRPC_MDSTR_UNREF(c->entries_keys[HASH_FRAGMENT_2(key_hash)]);
+ GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[HASH_FRAGMENT_2(key_hash)]);
c->entries_keys[HASH_FRAGMENT_2(key_hash)] = GRPC_MDSTR_REF(elem->key);
c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index;
} else {
- GRPC_MDSTR_UNREF(c->entries_keys[HASH_FRAGMENT_3(key_hash)]);
+ GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[HASH_FRAGMENT_3(key_hash)]);
c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key);
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
}
@@ -294,7 +296,7 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c,
add_tiny_header_data(st, len_pfx), len_pfx);
GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref(value_slice));
+ add_header_data(st, grpc_slice_ref_internal(value_slice));
}
static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c,
@@ -311,7 +313,7 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c,
add_tiny_header_data(st, len_pfx), len_pfx);
GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref(value_slice));
+ add_header_data(st, grpc_slice_ref_internal(value_slice));
}
static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c,
@@ -327,10 +329,10 @@ static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c,
*add_tiny_header_data(st, 1) = 0x40;
GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00,
add_tiny_header_data(st, len_key_len), len_key_len);
- add_header_data(st, grpc_slice_ref(elem->key->slice));
+ add_header_data(st, grpc_slice_ref_internal(elem->key->slice));
GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref(value_slice));
+ add_header_data(st, grpc_slice_ref_internal(value_slice));
}
static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c,
@@ -346,10 +348,10 @@ static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c,
*add_tiny_header_data(st, 1) = 0x00;
GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00,
add_tiny_header_data(st, len_key_len), len_key_len);
- add_header_data(st, grpc_slice_ref(elem->key->slice));
+ add_header_data(st, grpc_slice_ref_internal(elem->key->slice));
GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref(value_slice));
+ add_header_data(st, grpc_slice_ref_internal(value_slice));
}
static void emit_advertise_table_size_change(grpc_chttp2_hpack_compressor *c,
@@ -366,8 +368,8 @@ static uint32_t dynidx(grpc_chttp2_hpack_compressor *c, uint32_t elem_index) {
}
/* encode an mdelem */
-static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
- framer_state *st) {
+static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem *elem, framer_state *st) {
uint32_t key_hash = elem->key->hash;
uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
size_t decoder_space_usage;
@@ -417,7 +419,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- add_elem(c, elem);
+ add_elem(exec_ctx, c, elem);
return;
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
@@ -432,7 +434,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- add_elem(c, elem);
+ add_elem(exec_ctx, c, elem);
return;
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
@@ -445,7 +447,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
if (should_add_elem) {
emit_lithdr_incidx_v(c, elem, st);
- add_elem(c, elem);
+ add_elem(exec_ctx, c, elem);
return;
} else {
emit_lithdr_noidx_v(c, elem, st);
@@ -457,16 +459,17 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
#define STRLEN_LIT(x) (sizeof(x) - 1)
#define TIMEOUT_KEY "grpc-timeout"
-static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
+static void deadline_enc(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
framer_state *st) {
char timeout_str[GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
grpc_mdelem *mdelem;
grpc_http2_encode_timeout(
gpr_time_sub(deadline, gpr_now(deadline.clock_type)), timeout_str);
mdelem = grpc_mdelem_from_metadata_strings(
- GRPC_MDSTR_GRPC_TIMEOUT, grpc_mdstr_from_string(timeout_str));
- hpack_enc(c, mdelem, st);
- GRPC_MDELEM_UNREF(mdelem);
+ exec_ctx, GRPC_MDSTR_GRPC_TIMEOUT, grpc_mdstr_from_string(timeout_str));
+ hpack_enc(exec_ctx, c, mdelem, st);
+ GRPC_MDELEM_UNREF(exec_ctx, mdelem);
}
static uint32_t elems_for_bytes(uint32_t bytes) { return (bytes + 31) / 32; }
@@ -483,11 +486,12 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c) {
sizeof(*c->table_elem_size) * c->cap_table_elems);
}
-void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) {
+void grpc_chttp2_hpack_compressor_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c) {
int i;
for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_VALUES; i++) {
- if (c->entries_keys[i]) GRPC_MDSTR_UNREF(c->entries_keys[i]);
- if (c->entries_elems[i]) GRPC_MDELEM_UNREF(c->entries_elems[i]);
+ if (c->entries_keys[i]) GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[i]);
+ if (c->entries_elems[i]) GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[i]);
}
gpr_free(c->table_elem_size);
}
@@ -542,7 +546,8 @@ void grpc_chttp2_hpack_compressor_set_max_table_size(
}
}
-void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
+void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c,
uint32_t stream_id,
grpc_metadata_batch *metadata, int is_eof,
size_t max_frame_size,
@@ -571,11 +576,11 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
}
grpc_metadata_batch_assert_ok(metadata);
for (l = metadata->list.head; l; l = l->next) {
- hpack_enc(c, l->md, &st);
+ hpack_enc(exec_ctx, c, l->md, &st);
}
deadline = metadata->deadline;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
- deadline_enc(c, deadline, &st);
+ deadline_enc(exec_ctx, c, deadline, &st);
}
finish_frame(&st, 1, is_eof);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
index bcbd675ca2..3a35496ec8 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
@@ -83,13 +83,15 @@ typedef struct {
} grpc_chttp2_hpack_compressor;
void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c);
-void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c);
+void grpc_chttp2_hpack_compressor_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c);
void grpc_chttp2_hpack_compressor_set_max_table_size(
grpc_chttp2_hpack_compressor *c, uint32_t max_table_size);
void grpc_chttp2_hpack_compressor_set_max_usable_size(
grpc_chttp2_hpack_compressor *c, uint32_t max_table_size);
-void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t id,
+void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c, uint32_t id,
grpc_metadata_batch *metadata, int is_eof,
size_t max_frame_size,
grpc_transport_one_way_stats *stats,
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 6a9200b8db..8b91cc760b 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -670,11 +670,11 @@ static const uint8_t inverse_base64[256] = {
static grpc_error *on_hdr(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p,
grpc_mdelem *md, int add_to_table) {
if (add_to_table) {
- grpc_error *err = grpc_chttp2_hptbl_add(&p->table, md);
+ grpc_error *err = grpc_chttp2_hptbl_add(exec_ctx, &p->table, md);
if (err != GRPC_ERROR_NONE) return err;
}
if (p->on_header == NULL) {
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
return GRPC_ERROR_CREATE("on_header callback not set");
}
p->on_header(exec_ctx, p->on_header_user_data, md);
@@ -815,10 +815,10 @@ static grpc_error *finish_lithdr_incidx(grpc_exec_ctx *exec_ctx,
const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 1);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 1);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -828,10 +828,10 @@ static grpc_error *finish_lithdr_incidx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 1);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 1);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -883,10 +883,10 @@ static grpc_error *finish_lithdr_notidx(grpc_exec_ctx *exec_ctx,
const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -896,10 +896,10 @@ static grpc_error *finish_lithdr_notidx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -951,10 +951,10 @@ static grpc_error *finish_lithdr_nvridx(grpc_exec_ctx *exec_ctx,
const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -964,10 +964,10 @@ static grpc_error *finish_lithdr_nvridx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -1020,7 +1020,7 @@ static grpc_error *finish_max_tbl_size(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index);
}
grpc_error *err =
- grpc_chttp2_hptbl_set_current_table_size(&p->table, p->index);
+ grpc_chttp2_hptbl_set_current_table_size(exec_ctx, &p->table, p->index);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -1534,7 +1534,8 @@ static grpc_error *parse_value_string_with_literal_key(
/* PUBLIC INTERFACE */
-void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) {
+void grpc_chttp2_hpack_parser_init(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p) {
p->on_header = NULL;
p->on_header_user_data = NULL;
p->state = parse_begin;
@@ -1546,7 +1547,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) {
p->value.length = 0;
p->dynamic_table_update_allowed = 2;
p->last_error = GRPC_ERROR_NONE;
- grpc_chttp2_hptbl_init(&p->table);
+ grpc_chttp2_hptbl_init(exec_ctx, &p->table);
}
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) {
@@ -1554,8 +1555,9 @@ void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) {
p->state = parse_stream_dep0;
}
-void grpc_chttp2_hpack_parser_destroy(grpc_chttp2_hpack_parser *p) {
- grpc_chttp2_hptbl_destroy(&p->table);
+void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p) {
+ grpc_chttp2_hptbl_destroy(exec_ctx, &p->table);
GRPC_ERROR_UNREF(p->last_error);
gpr_free(p->key.str);
gpr_free(p->value.str);
@@ -1634,10 +1636,11 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
however -- it might be that we receive a RST_STREAM following this
and can avoid the extra write */
GRPC_CHTTP2_STREAM_REF(s, "final_rst");
- grpc_combiner_execute_finally(
- exec_ctx, t->combiner,
- grpc_closure_create(force_client_rst_stream, s), GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(force_client_rst_stream, s,
+ grpc_combiner_finally_scheduler(
+ t->combiner, false)),
+ GRPC_ERROR_NONE);
}
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h
index a39bf466cd..52ccf1e7a7 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h
@@ -99,8 +99,10 @@ struct grpc_chttp2_hpack_parser {
grpc_chttp2_hptbl table;
};
-void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p);
-void grpc_chttp2_hpack_parser_destroy(grpc_chttp2_hpack_parser *p);
+void grpc_chttp2_hpack_parser_init(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p);
+void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p);
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c
index 2dc793d304..26d4036d49 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.c
@@ -179,7 +179,7 @@ static uint32_t entries_for_bytes(uint32_t bytes) {
GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD;
}
-void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl) {
+void grpc_chttp2_hptbl_init(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) {
size_t i;
memset(tbl, 0, sizeof(*tbl));
@@ -190,18 +190,20 @@ void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl) {
tbl->ents = gpr_malloc(sizeof(*tbl->ents) * tbl->cap_entries);
memset(tbl->ents, 0, sizeof(*tbl->ents) * tbl->cap_entries);
for (i = 1; i <= GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) {
- tbl->static_ents[i - 1] =
- grpc_mdelem_from_strings(static_table[i].key, static_table[i].value);
+ tbl->static_ents[i - 1] = grpc_mdelem_from_strings(
+ exec_ctx, static_table[i].key, static_table[i].value);
}
}
-void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl) {
+void grpc_chttp2_hptbl_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl) {
size_t i;
for (i = 0; i < GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) {
- GRPC_MDELEM_UNREF(tbl->static_ents[i]);
+ GRPC_MDELEM_UNREF(exec_ctx, tbl->static_ents[i]);
}
for (i = 0; i < tbl->num_ents; i++) {
- GRPC_MDELEM_UNREF(tbl->ents[(tbl->first_ent + i) % tbl->cap_entries]);
+ GRPC_MDELEM_UNREF(exec_ctx,
+ tbl->ents[(tbl->first_ent + i) % tbl->cap_entries]);
}
gpr_free(tbl->ents);
}
@@ -224,7 +226,7 @@ grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
}
/* Evict one element from the table */
-static void evict1(grpc_chttp2_hptbl *tbl) {
+static void evict1(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) {
grpc_mdelem *first_ent = tbl->ents[tbl->first_ent];
size_t elem_bytes = GRPC_SLICE_LENGTH(first_ent->key->slice) +
GRPC_SLICE_LENGTH(first_ent->value->slice) +
@@ -233,7 +235,7 @@ static void evict1(grpc_chttp2_hptbl *tbl) {
tbl->mem_used -= (uint32_t)elem_bytes;
tbl->first_ent = ((tbl->first_ent + 1) % tbl->cap_entries);
tbl->num_ents--;
- GRPC_MDELEM_UNREF(first_ent);
+ GRPC_MDELEM_UNREF(exec_ctx, first_ent);
}
static void rebuild_ents(grpc_chttp2_hptbl *tbl, uint32_t new_cap) {
@@ -249,7 +251,8 @@ static void rebuild_ents(grpc_chttp2_hptbl *tbl, uint32_t new_cap) {
tbl->first_ent = 0;
}
-void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
+void grpc_chttp2_hptbl_set_max_bytes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
uint32_t max_bytes) {
if (tbl->max_bytes == max_bytes) {
return;
@@ -258,12 +261,13 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
}
while (tbl->mem_used > max_bytes) {
- evict1(tbl);
+ evict1(exec_ctx, tbl);
}
tbl->max_bytes = max_bytes;
}
-grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
+grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
uint32_t bytes) {
if (tbl->current_table_bytes == bytes) {
return GRPC_ERROR_NONE;
@@ -281,7 +285,7 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes);
}
while (tbl->mem_used > bytes) {
- evict1(tbl);
+ evict1(exec_ctx, tbl);
}
tbl->current_table_bytes = bytes;
tbl->max_entries = entries_for_bytes(bytes);
@@ -296,7 +300,8 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
return GRPC_ERROR_NONE;
}
-grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
+grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
/* determine how many bytes of buffer this entry represents */
size_t elem_bytes = GRPC_SLICE_LENGTH(md->key->slice) +
GRPC_SLICE_LENGTH(md->value->slice) +
@@ -326,14 +331,14 @@ grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
* empty table.
*/
while (tbl->num_ents) {
- evict1(tbl);
+ evict1(exec_ctx, tbl);
}
return GRPC_ERROR_NONE;
}
/* evict entries to ensure no overflow */
while (elem_bytes > (size_t)tbl->current_table_bytes - tbl->mem_used) {
- evict1(tbl);
+ evict1(exec_ctx, tbl);
}
/* copy the finalized entry in */
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.h b/src/core/ext/transport/chttp2/transport/hpack_table.h
index 2ca130e64b..144574ef06 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.h
@@ -84,18 +84,21 @@ typedef struct {
} grpc_chttp2_hptbl;
/* initialize a hpack table */
-void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl);
-void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl);
-void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
+void grpc_chttp2_hptbl_init(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl);
+void grpc_chttp2_hptbl_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl);
+void grpc_chttp2_hptbl_set_max_bytes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
uint32_t max_bytes);
-grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
+grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
uint32_t bytes);
/* lookup a table entry based on its hpack index */
grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
uint32_t index);
/* add a table entry to the index */
-grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl,
+grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
grpc_mdelem *md) GRPC_MUST_USE_RESULT;
/* Find a key/value pair in the table... returns the index in the table of the
most similar entry, or 0 if the value was not found */
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index 3e463a7995..5d1094999c 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -46,11 +46,11 @@ void grpc_chttp2_incoming_metadata_buffer_init(
}
void grpc_chttp2_incoming_metadata_buffer_destroy(
- grpc_chttp2_incoming_metadata_buffer *buffer) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) {
size_t i;
if (!buffer->published) {
for (i = 0; i < buffer->count; i++) {
- GRPC_MDELEM_UNREF(buffer->elems[i].md);
+ GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
}
}
gpr_free(buffer->elems);
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
index df4343b93e..7a0c4da15f 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
@@ -49,7 +49,7 @@ typedef struct {
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_destroy(
- grpc_chttp2_incoming_metadata_buffer *buffer);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b727965d43..a52acbacdb 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -212,10 +212,8 @@ struct grpc_chttp2_transport {
grpc_closure write_action_begin_locked;
grpc_closure write_action;
- grpc_closure write_action_end;
grpc_closure write_action_end_locked;
- grpc_closure read_action_begin;
grpc_closure read_action_locked;
/** incoming read bytes */
@@ -336,10 +334,8 @@ struct grpc_chttp2_transport {
/** have we scheduled a destructive cleanup? */
bool destructive_reclaimer_registered;
/** benign cleanup closure */
- grpc_closure benign_reclaimer;
grpc_closure benign_reclaimer_locked;
/** destructive cleanup closure */
- grpc_closure destructive_reclaimer;
grpc_closure destructive_reclaimer_locked;
};
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 5efb49751c..4fb5dc7bd2 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -336,7 +336,7 @@ static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser,
}
static void skip_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdelem *md) {
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
}
static grpc_error *init_skip_frame_parser(grpc_exec_ctx *exec_ctx,
@@ -477,7 +477,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_incoming_metadata_buffer_set_deadline(
&s->metadata_buffer[0],
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout));
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md);
const size_t metadata_size_limit =
@@ -495,7 +495,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
s->seen_error = true;
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md);
}
@@ -538,7 +538,7 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
s->seen_error = true;
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md);
}
@@ -712,7 +712,7 @@ static grpc_error *init_settings_frame_parser(grpc_exec_ctx *exec_ctx,
memcpy(t->settings[GRPC_ACKED_SETTINGS], t->settings[GRPC_SENT_SETTINGS],
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
grpc_chttp2_hptbl_set_max_bytes(
- &t->hpack_parser.table,
+ exec_ctx, &t->hpack_parser.table,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
t->sent_local_settings = 0;
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 139e7387c4..ef2010af7b 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -39,6 +39,7 @@
#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
static void add_to_write_list(grpc_chttp2_write_cb **list,
grpc_chttp2_write_cb *cb) {
@@ -119,7 +120,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
/* send initial metadata if it's available */
if (!sent_initial_metadata && s->send_initial_metadata) {
grpc_chttp2_encode_header(
- &t->hpack_compressor, s->id, s->send_initial_metadata, 0,
+ exec_ctx, &t->hpack_compressor, s->id, s->send_initial_metadata, 0,
t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
&s->stats.outgoing, &t->outbuf);
s->send_initial_metadata = NULL;
@@ -186,9 +187,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
&s->stats.outgoing, &t->outbuf);
} else {
grpc_chttp2_encode_header(
- &t->hpack_compressor, s->id, s->send_trailing_metadata, true,
- t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ exec_ctx, &t->hpack_compressor, s->id, s->send_trailing_metadata,
+ true, t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
&s->stats.outgoing, &t->outbuf);
}
s->send_trailing_metadata = NULL;
@@ -254,7 +255,7 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
}
- grpc_slice_buffer_reset_and_unref(&t->outbuf);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->outbuf);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_chttp2_end_write", 0);
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index afc59f4b12..296c406324 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -427,6 +427,7 @@ static void on_response_headers_received(
cronet_bidirectional_stream *stream,
const cronet_bidirectional_stream_header_array *headers,
const char *negotiated_protocol) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation;
@@ -438,7 +439,7 @@ static void on_response_headers_received(
grpc_chttp2_incoming_metadata_buffer_add(
&s->state.rs.initial_metadata,
grpc_mdelem_from_metadata_strings(
- grpc_mdstr_from_string(headers->headers[i].key),
+ &exec_ctx, grpc_mdstr_from_string(headers->headers[i].key),
grpc_mdstr_from_string(headers->headers[i].value)));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
@@ -455,6 +456,7 @@ static void on_response_headers_received(
s->state.rs.remaining_bytes);
}
gpr_mu_unlock(&s->mu);
+ grpc_exec_ctx_finish(&exec_ctx);
execute_from_storage(s);
}
@@ -520,6 +522,7 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
static void on_response_trailers_received(
cronet_bidirectional_stream *stream,
const cronet_bidirectional_stream_header_array *trailers) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
stream_obj *s = (stream_obj *)stream->annotation;
@@ -534,7 +537,7 @@ static void on_response_trailers_received(
grpc_chttp2_incoming_metadata_buffer_add(
&s->state.rs.trailing_metadata,
grpc_mdelem_from_metadata_strings(
- grpc_mdstr_from_string(trailers->headers[i].key),
+ &exec_ctx, grpc_mdstr_from_string(trailers->headers[i].key),
grpc_mdstr_from_string(trailers->headers[i].value)));
s->state.rs.trailing_metadata_valid = true;
if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
@@ -554,8 +557,10 @@ static void on_response_trailers_received(
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
gpr_mu_unlock(&s->mu);
+ grpc_exec_ctx_finish(&exec_ctx);
} else {
gpr_mu_unlock(&s->mu);
+ grpc_exec_ctx_finish(&exec_ctx);
execute_from_storage(s);
}
}
@@ -849,17 +854,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED);
} else if (stream_state->state_callback_received[OP_FAILED]) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, stream_op->recv_initial_metadata_ready,
- make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -910,22 +915,22 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_CANCELLED);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, stream_op->recv_message_ready,
- make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -958,8 +963,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
&stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -993,8 +998,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
&stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
/* Do an extra read to trigger on_succeeded() callback in case connection
@@ -1055,18 +1060,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
OP_ON_COMPLETE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete,
- GRPC_ERROR_REF(stream_state->cancel_error), NULL);
+ grpc_closure_sched(exec_ctx, stream_op->on_complete,
+ GRPC_ERROR_REF(stream_state->cancel_error));
} else if (stream_state->state_callback_received[OP_FAILED]) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, stream_op->on_complete,
- make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
} else {
/* All actions in this stream_op are complete. Call the on_complete
* callback
*/
- grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE);
}
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true;