aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2/client/chttp2_connector.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2/client/chttp2_connector.cc')
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.cc70
1 files changed, 39 insertions, 31 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
index db5962e7fd..819f66aec3 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
@@ -61,34 +61,38 @@ static void chttp2_connector_ref(grpc_connector* con) {
gpr_ref(&c->refs);
}
-static void chttp2_connector_unref(grpc_connector* con) {
+static void chttp2_connector_unref(grpc_exec_ctx* exec_ctx,
+ grpc_connector* con) {
chttp2_connector* c = (chttp2_connector*)con;
if (gpr_unref(&c->refs)) {
gpr_mu_destroy(&c->mu);
// If handshaking is not yet in progress, destroy the endpoint.
// Otherwise, the handshaker will do this for us.
- if (c->endpoint != nullptr) grpc_endpoint_destroy(c->endpoint);
+ if (c->endpoint != nullptr) grpc_endpoint_destroy(exec_ctx, c->endpoint);
gpr_free(c);
}
}
-static void chttp2_connector_shutdown(grpc_connector* con, grpc_error* why) {
+static void chttp2_connector_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_connector* con, grpc_error* why) {
chttp2_connector* c = (chttp2_connector*)con;
gpr_mu_lock(&c->mu);
c->shutdown = true;
if (c->handshake_mgr != nullptr) {
- grpc_handshake_manager_shutdown(c->handshake_mgr, GRPC_ERROR_REF(why));
+ grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr,
+ GRPC_ERROR_REF(why));
}
// If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us.
if (!c->connecting && c->endpoint != nullptr) {
- grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(why));
+ grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(why));
}
gpr_mu_unlock(&c->mu);
GRPC_ERROR_UNREF(why);
}
-static void on_handshake_done(void* arg, grpc_error* error) {
+static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_handshaker_args* args = (grpc_handshaker_args*)arg;
chttp2_connector* c = (chttp2_connector*)args->user_data;
gpr_mu_lock(&c->mu);
@@ -101,20 +105,20 @@ static void on_handshake_done(void* arg, grpc_error* error) {
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
- grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
- grpc_endpoint_destroy(args->endpoint);
- grpc_channel_args_destroy(args->args);
- grpc_slice_buffer_destroy_internal(args->read_buffer);
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_REF(error));
+ grpc_endpoint_destroy(exec_ctx, args->endpoint);
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
gpr_free(args->read_buffer);
} else {
error = GRPC_ERROR_REF(error);
}
memset(c->result, 0, sizeof(*c->result));
} else {
- grpc_endpoint_delete_from_pollset_set(args->endpoint,
+ grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint,
c->args.interested_parties);
- c->result->transport =
- grpc_create_chttp2_transport(args->args, args->endpoint, true);
+ c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args,
+ args->endpoint, true);
GPR_ASSERT(c->result->transport);
// TODO(roth): We ideally want to wait until we receive HTTP/2
// settings from the server before we consider the connection
@@ -140,32 +144,34 @@ static void on_handshake_done(void* arg, grpc_error* error) {
// so until after transparent retries is implemented. Otherwise, any
// RPC that we attempt to send on the connection before the timeout
// would fail instead of being retried on a subsequent attempt.
- grpc_chttp2_transport_start_reading(c->result->transport, args->read_buffer,
- nullptr);
+ grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
+ args->read_buffer, nullptr);
c->result->channel_args = args->args;
}
grpc_closure* notify = c->notify;
c->notify = nullptr;
- GRPC_CLOSURE_SCHED(notify, error);
- grpc_handshake_manager_destroy(c->handshake_mgr);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify, error);
+ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
c->handshake_mgr = nullptr;
gpr_mu_unlock(&c->mu);
- chttp2_connector_unref((grpc_connector*)c);
+ chttp2_connector_unref(exec_ctx, (grpc_connector*)c);
}
-static void start_handshake_locked(chttp2_connector* c) {
+static void start_handshake_locked(grpc_exec_ctx* exec_ctx,
+ chttp2_connector* c) {
c->handshake_mgr = grpc_handshake_manager_create();
- grpc_handshakers_add(HANDSHAKER_CLIENT, c->args.channel_args,
+ grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args,
c->handshake_mgr);
- grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint,
+ c->args.interested_parties);
grpc_handshake_manager_do_handshake(
- c->handshake_mgr, c->args.interested_parties, c->endpoint,
+ exec_ctx, c->handshake_mgr, c->args.interested_parties, c->endpoint,
c->args.channel_args, c->args.deadline, nullptr /* acceptor */,
on_handshake_done, c);
c->endpoint = nullptr; // Endpoint handed off to handshake manager.
}
-static void connected(void* arg, grpc_error* error) {
+static void connected(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
chttp2_connector* c = (chttp2_connector*)arg;
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->connecting);
@@ -179,26 +185,27 @@ static void connected(void* arg, grpc_error* error) {
memset(c->result, 0, sizeof(*c->result));
grpc_closure* notify = c->notify;
c->notify = nullptr;
- GRPC_CLOSURE_SCHED(notify, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify, error);
if (c->endpoint != nullptr) {
- grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error));
+ grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error));
}
gpr_mu_unlock(&c->mu);
- chttp2_connector_unref((grpc_connector*)arg);
+ chttp2_connector_unref(exec_ctx, (grpc_connector*)arg);
} else {
GPR_ASSERT(c->endpoint != nullptr);
- start_handshake_locked(c);
+ start_handshake_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
}
}
-static void chttp2_connector_connect(grpc_connector* con,
+static void chttp2_connector_connect(grpc_exec_ctx* exec_ctx,
+ grpc_connector* con,
const grpc_connect_in_args* args,
grpc_connect_out_args* result,
grpc_closure* notify) {
chttp2_connector* c = (chttp2_connector*)con;
grpc_resolved_address addr;
- grpc_get_subchannel_address_arg(args->channel_args, &addr);
+ grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == nullptr);
c->notify = notify;
@@ -209,8 +216,9 @@ static void chttp2_connector_connect(grpc_connector* con,
GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
GPR_ASSERT(!c->connecting);
c->connecting = true;
- grpc_tcp_client_connect(&c->connected, &c->endpoint, args->interested_parties,
- args->channel_args, &addr, args->deadline);
+ grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
+ args->interested_parties, args->channel_args, &addr,
+ args->deadline);
gpr_mu_unlock(&c->mu);
}