aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2/client')
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.cc70
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.cc32
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc16
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc51
4 files changed, 92 insertions, 77 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
index db5962e7fd..819f66aec3 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
@@ -61,34 +61,38 @@ static void chttp2_connector_ref(grpc_connector* con) {
gpr_ref(&c->refs);
}
-static void chttp2_connector_unref(grpc_connector* con) {
+static void chttp2_connector_unref(grpc_exec_ctx* exec_ctx,
+ grpc_connector* con) {
chttp2_connector* c = (chttp2_connector*)con;
if (gpr_unref(&c->refs)) {
gpr_mu_destroy(&c->mu);
// If handshaking is not yet in progress, destroy the endpoint.
// Otherwise, the handshaker will do this for us.
- if (c->endpoint != nullptr) grpc_endpoint_destroy(c->endpoint);
+ if (c->endpoint != nullptr) grpc_endpoint_destroy(exec_ctx, c->endpoint);
gpr_free(c);
}
}
-static void chttp2_connector_shutdown(grpc_connector* con, grpc_error* why) {
+static void chttp2_connector_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_connector* con, grpc_error* why) {
chttp2_connector* c = (chttp2_connector*)con;
gpr_mu_lock(&c->mu);
c->shutdown = true;
if (c->handshake_mgr != nullptr) {
- grpc_handshake_manager_shutdown(c->handshake_mgr, GRPC_ERROR_REF(why));
+ grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr,
+ GRPC_ERROR_REF(why));
}
// If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us.
if (!c->connecting && c->endpoint != nullptr) {
- grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(why));
+ grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(why));
}
gpr_mu_unlock(&c->mu);
GRPC_ERROR_UNREF(why);
}
-static void on_handshake_done(void* arg, grpc_error* error) {
+static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_handshaker_args* args = (grpc_handshaker_args*)arg;
chttp2_connector* c = (chttp2_connector*)args->user_data;
gpr_mu_lock(&c->mu);
@@ -101,20 +105,20 @@ static void on_handshake_done(void* arg, grpc_error* error) {
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
- grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
- grpc_endpoint_destroy(args->endpoint);
- grpc_channel_args_destroy(args->args);
- grpc_slice_buffer_destroy_internal(args->read_buffer);
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_REF(error));
+ grpc_endpoint_destroy(exec_ctx, args->endpoint);
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
gpr_free(args->read_buffer);
} else {
error = GRPC_ERROR_REF(error);
}
memset(c->result, 0, sizeof(*c->result));
} else {
- grpc_endpoint_delete_from_pollset_set(args->endpoint,
+ grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint,
c->args.interested_parties);
- c->result->transport =
- grpc_create_chttp2_transport(args->args, args->endpoint, true);
+ c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args,
+ args->endpoint, true);
GPR_ASSERT(c->result->transport);
// TODO(roth): We ideally want to wait until we receive HTTP/2
// settings from the server before we consider the connection
@@ -140,32 +144,34 @@ static void on_handshake_done(void* arg, grpc_error* error) {
// so until after transparent retries is implemented. Otherwise, any
// RPC that we attempt to send on the connection before the timeout
// would fail instead of being retried on a subsequent attempt.
- grpc_chttp2_transport_start_reading(c->result->transport, args->read_buffer,
- nullptr);
+ grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
+ args->read_buffer, nullptr);
c->result->channel_args = args->args;
}
grpc_closure* notify = c->notify;
c->notify = nullptr;
- GRPC_CLOSURE_SCHED(notify, error);
- grpc_handshake_manager_destroy(c->handshake_mgr);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify, error);
+ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
c->handshake_mgr = nullptr;
gpr_mu_unlock(&c->mu);
- chttp2_connector_unref((grpc_connector*)c);
+ chttp2_connector_unref(exec_ctx, (grpc_connector*)c);
}
-static void start_handshake_locked(chttp2_connector* c) {
+static void start_handshake_locked(grpc_exec_ctx* exec_ctx,
+ chttp2_connector* c) {
c->handshake_mgr = grpc_handshake_manager_create();
- grpc_handshakers_add(HANDSHAKER_CLIENT, c->args.channel_args,
+ grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args,
c->handshake_mgr);
- grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint,
+ c->args.interested_parties);
grpc_handshake_manager_do_handshake(
- c->handshake_mgr, c->args.interested_parties, c->endpoint,
+ exec_ctx, c->handshake_mgr, c->args.interested_parties, c->endpoint,
c->args.channel_args, c->args.deadline, nullptr /* acceptor */,
on_handshake_done, c);
c->endpoint = nullptr; // Endpoint handed off to handshake manager.
}
-static void connected(void* arg, grpc_error* error) {
+static void connected(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
chttp2_connector* c = (chttp2_connector*)arg;
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->connecting);
@@ -179,26 +185,27 @@ static void connected(void* arg, grpc_error* error) {
memset(c->result, 0, sizeof(*c->result));
grpc_closure* notify = c->notify;
c->notify = nullptr;
- GRPC_CLOSURE_SCHED(notify, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify, error);
if (c->endpoint != nullptr) {
- grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error));
+ grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error));
}
gpr_mu_unlock(&c->mu);
- chttp2_connector_unref((grpc_connector*)arg);
+ chttp2_connector_unref(exec_ctx, (grpc_connector*)arg);
} else {
GPR_ASSERT(c->endpoint != nullptr);
- start_handshake_locked(c);
+ start_handshake_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
}
}
-static void chttp2_connector_connect(grpc_connector* con,
+static void chttp2_connector_connect(grpc_exec_ctx* exec_ctx,
+ grpc_connector* con,
const grpc_connect_in_args* args,
grpc_connect_out_args* result,
grpc_closure* notify) {
chttp2_connector* c = (chttp2_connector*)con;
grpc_resolved_address addr;
- grpc_get_subchannel_address_arg(args->channel_args, &addr);
+ grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == nullptr);
c->notify = notify;
@@ -209,8 +216,9 @@ static void chttp2_connector_connect(grpc_connector* con,
GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
GPR_ASSERT(!c->connecting);
c->connecting = true;
- grpc_tcp_client_connect(&c->connected, &c->endpoint, args->interested_parties,
- args->channel_args, &addr, args->deadline);
+ grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
+ args->interested_parties, args->channel_args, &addr,
+ args->deadline);
gpr_mu_unlock(&c->mu);
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
index 6a1b70964d..028b69e5ff 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
@@ -34,19 +34,21 @@ static void client_channel_factory_ref(
grpc_client_channel_factory* cc_factory) {}
static void client_channel_factory_unref(
- grpc_client_channel_factory* cc_factory) {}
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* cc_factory) {}
static grpc_subchannel* client_channel_factory_create_subchannel(
- grpc_client_channel_factory* cc_factory, const grpc_subchannel_args* args) {
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* cc_factory,
+ const grpc_subchannel_args* args) {
grpc_connector* connector = grpc_chttp2_connector_create();
- grpc_subchannel* s = grpc_subchannel_create(connector, args);
- grpc_connector_unref(connector);
+ grpc_subchannel* s = grpc_subchannel_create(exec_ctx, connector, args);
+ grpc_connector_unref(exec_ctx, connector);
return s;
}
static grpc_channel* client_channel_factory_create_channel(
- grpc_client_channel_factory* cc_factory, const char* target,
- grpc_client_channel_type type, const grpc_channel_args* args) {
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* cc_factory,
+ const char* target, grpc_client_channel_type type,
+ const grpc_channel_args* args) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
return nullptr;
@@ -54,14 +56,14 @@ static grpc_channel* client_channel_factory_create_channel(
// Add channel arg containing the server URI.
grpc_arg arg = grpc_channel_arg_string_create(
(char*)GRPC_ARG_SERVER_URI,
- grpc_resolver_factory_add_default_prefix_if_needed(target));
+ grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target));
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
gpr_free(arg.value.string);
- grpc_channel* channel =
- grpc_channel_create(target, new_args, GRPC_CLIENT_CHANNEL, nullptr);
- grpc_channel_args_destroy(new_args);
+ grpc_channel* channel = grpc_channel_create(exec_ctx, target, new_args,
+ GRPC_CLIENT_CHANNEL, nullptr);
+ grpc_channel_args_destroy(exec_ctx, new_args);
return channel;
}
@@ -80,7 +82,7 @@ static grpc_client_channel_factory client_channel_factory = {
grpc_channel* grpc_insecure_channel_create(const char* target,
const grpc_channel_args* args,
void* reserved) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%s, args=%p, reserved=%p)", 3,
(target, args, reserved));
@@ -91,11 +93,11 @@ grpc_channel* grpc_insecure_channel_create(const char* target,
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
// Create channel.
grpc_channel* channel = client_channel_factory_create_channel(
- &client_channel_factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR,
- new_args);
+ &exec_ctx, &client_channel_factory, target,
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
// Clean up.
- grpc_channel_args_destroy(new_args);
-
+ grpc_channel_args_destroy(&exec_ctx, new_args);
+ grpc_exec_ctx_finish(&exec_ctx);
return channel != nullptr ? channel
: grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL,
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index 0cdea5a94e..c6b149d0b1 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -37,7 +37,7 @@
grpc_channel* grpc_insecure_channel_create_from_fd(
const char* target, int fd, const grpc_channel_args* args) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE("grpc_insecure_channel_create(target=%p, fd=%d, args=%p)", 3,
(target, fd, args));
@@ -50,17 +50,17 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
- grpc_fd_create(fd, "client"), args, "fd-client");
+ &exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client");
grpc_transport* transport =
- grpc_create_chttp2_transport(final_args, client, true);
+ grpc_create_chttp2_transport(&exec_ctx, final_args, client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
- target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
- grpc_channel_args_destroy(final_args);
- grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
+ &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
+ grpc_channel_args_destroy(&exec_ctx, final_args);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_finish(&exec_ctx);
return channel != nullptr ? channel
: grpc_lame_client_channel_create(
@@ -73,7 +73,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
grpc_channel* grpc_insecure_channel_create_from_fd(
const char* target, int fd, const grpc_channel_args* args) {
GPR_ASSERT(0);
- return nullptr;
+ return NULL;
}
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
index 27c5b96a4c..dd2bc427a7 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
@@ -41,10 +41,10 @@ static void client_channel_factory_ref(
grpc_client_channel_factory* cc_factory) {}
static void client_channel_factory_unref(
- grpc_client_channel_factory* cc_factory) {}
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* cc_factory) {}
static grpc_subchannel_args* get_secure_naming_subchannel_args(
- const grpc_subchannel_args* args) {
+ grpc_exec_ctx* exec_ctx, const grpc_subchannel_args* args) {
grpc_channel_credentials* channel_credentials =
grpc_channel_credentials_find_in_args(args->args);
if (channel_credentials == nullptr) {
@@ -68,7 +68,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
const char* server_uri_str = server_uri_arg->value.string;
GPR_ASSERT(server_uri_str != nullptr);
grpc_uri* server_uri =
- grpc_uri_parse(server_uri_str, true /* supress errors */);
+ grpc_uri_parse(exec_ctx, server_uri_str, true /* supress errors */);
GPR_ASSERT(server_uri != nullptr);
const char* server_uri_path;
server_uri_path =
@@ -81,7 +81,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
const char* target_uri_str =
grpc_get_subchannel_address_uri_arg(args->args);
grpc_uri* target_uri =
- grpc_uri_parse(target_uri_str, false /* suppress errors */);
+ grpc_uri_parse(exec_ctx, target_uri_str, false /* suppress errors */);
GPR_ASSERT(target_uri != nullptr);
if (target_uri->path[0] != '\0') { // "path" may be empty
const grpc_slice key = grpc_slice_from_static_string(
@@ -89,7 +89,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
const char* value =
(const char*)grpc_slice_hash_table_get(targets_info, key);
if (value != nullptr) target_name_to_check = gpr_strdup(value);
- grpc_slice_unref_internal(key);
+ grpc_slice_unref_internal(exec_ctx, key);
}
if (target_name_to_check == nullptr) {
// If the target name to check hasn't already been set, fall back to using
@@ -107,7 +107,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
grpc_channel_args* new_args_from_connector = nullptr;
const grpc_security_status security_status =
grpc_channel_credentials_create_security_connector(
- channel_credentials, target_name_to_check, args->args,
+ exec_ctx, channel_credentials, target_name_to_check, args->args,
&subchannel_security_connector, &new_args_from_connector);
if (security_status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR,
@@ -123,10 +123,10 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != nullptr ? new_args_from_connector : args->args,
&new_security_connector_arg, 1);
- GRPC_SECURITY_CONNECTOR_UNREF(&subchannel_security_connector->base,
+ GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &subchannel_security_connector->base,
"lb_channel_create");
if (new_args_from_connector != nullptr) {
- grpc_channel_args_destroy(new_args_from_connector);
+ grpc_channel_args_destroy(exec_ctx, new_args_from_connector);
}
grpc_subchannel_args* final_sc_args =
(grpc_subchannel_args*)gpr_malloc(sizeof(*final_sc_args));
@@ -136,9 +136,10 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
}
static grpc_subchannel* client_channel_factory_create_subchannel(
- grpc_client_channel_factory* cc_factory, const grpc_subchannel_args* args) {
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* cc_factory,
+ const grpc_subchannel_args* args) {
grpc_subchannel_args* subchannel_args =
- get_secure_naming_subchannel_args(args);
+ get_secure_naming_subchannel_args(exec_ctx, args);
if (subchannel_args == nullptr) {
gpr_log(
GPR_ERROR,
@@ -146,16 +147,19 @@ static grpc_subchannel* client_channel_factory_create_subchannel(
return nullptr;
}
grpc_connector* connector = grpc_chttp2_connector_create();
- grpc_subchannel* s = grpc_subchannel_create(connector, subchannel_args);
- grpc_connector_unref(connector);
- grpc_channel_args_destroy((grpc_channel_args*)subchannel_args->args);
+ grpc_subchannel* s =
+ grpc_subchannel_create(exec_ctx, connector, subchannel_args);
+ grpc_connector_unref(exec_ctx, connector);
+ grpc_channel_args_destroy(exec_ctx,
+ (grpc_channel_args*)subchannel_args->args);
gpr_free(subchannel_args);
return s;
}
static grpc_channel* client_channel_factory_create_channel(
- grpc_client_channel_factory* cc_factory, const char* target,
- grpc_client_channel_type type, const grpc_channel_args* args) {
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* cc_factory,
+ const char* target, grpc_client_channel_type type,
+ const grpc_channel_args* args) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
return nullptr;
@@ -163,14 +167,14 @@ static grpc_channel* client_channel_factory_create_channel(
// Add channel arg containing the server URI.
grpc_arg arg = grpc_channel_arg_string_create(
(char*)GRPC_ARG_SERVER_URI,
- grpc_resolver_factory_add_default_prefix_if_needed(target));
+ grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target));
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
gpr_free(arg.value.string);
- grpc_channel* channel =
- grpc_channel_create(target, new_args, GRPC_CLIENT_CHANNEL, nullptr);
- grpc_channel_args_destroy(new_args);
+ grpc_channel* channel = grpc_channel_create(exec_ctx, target, new_args,
+ GRPC_CLIENT_CHANNEL, nullptr);
+ grpc_channel_args_destroy(exec_ctx, new_args);
return channel;
}
@@ -190,7 +194,7 @@ grpc_channel* grpc_secure_channel_create(grpc_channel_credentials* creds,
const char* target,
const grpc_channel_args* args,
void* reserved) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_secure_channel_create(creds=%p, target=%s, args=%p, "
"reserved=%p)",
@@ -207,10 +211,11 @@ grpc_channel* grpc_secure_channel_create(grpc_channel_credentials* creds,
args, args_to_add, GPR_ARRAY_SIZE(args_to_add));
// Create channel.
channel = client_channel_factory_create_channel(
- &client_channel_factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR,
- new_args);
+ &exec_ctx, &client_channel_factory, target,
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
// Clean up.
- grpc_channel_args_destroy(new_args);
+ grpc_channel_args_destroy(&exec_ctx, new_args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
return channel != nullptr ? channel
: grpc_lame_client_channel_create(