aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/security/transport/secure_endpoint.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/security/transport/secure_endpoint.cc')
-rw-r--r--src/core/lib/security/transport/secure_endpoint.cc120
1 files changed, 67 insertions, 53 deletions
diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc
index e5c089de9c..4cd317a06d 100644
--- a/src/core/lib/security/transport/secure_endpoint.cc
+++ b/src/core/lib/security/transport/secure_endpoint.cc
@@ -63,27 +63,28 @@ typedef struct {
grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint");
-static void destroy(secure_endpoint* secure_ep) {
+static void destroy(grpc_exec_ctx* exec_ctx, secure_endpoint* secure_ep) {
secure_endpoint* ep = secure_ep;
- grpc_endpoint_destroy(ep->wrapped_ep);
+ grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep);
tsi_frame_protector_destroy(ep->protector);
- tsi_zero_copy_grpc_protector_destroy(ep->zero_copy_protector);
- grpc_slice_buffer_destroy_internal(&ep->leftover_bytes);
- grpc_slice_unref_internal(ep->read_staging_buffer);
- grpc_slice_unref_internal(ep->write_staging_buffer);
- grpc_slice_buffer_destroy_internal(&ep->output_buffer);
- grpc_slice_buffer_destroy_internal(&ep->source_buffer);
+ tsi_zero_copy_grpc_protector_destroy(exec_ctx, ep->zero_copy_protector);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes);
+ grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer);
+ grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &ep->output_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &ep->source_buffer);
gpr_mu_destroy(&ep->protector_mu);
gpr_free(ep);
}
#ifndef NDEBUG
-#define SECURE_ENDPOINT_UNREF(ep, reason) \
- secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
+#define SECURE_ENDPOINT_UNREF(exec_ctx, ep, reason) \
+ secure_endpoint_unref((exec_ctx), (ep), (reason), __FILE__, __LINE__)
#define SECURE_ENDPOINT_REF(ep, reason) \
secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
-static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
- const char* file, int line) {
+static void secure_endpoint_unref(grpc_exec_ctx* exec_ctx, secure_endpoint* ep,
+ const char* reason, const char* file,
+ int line) {
if (grpc_trace_secure_endpoint.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -91,7 +92,7 @@ static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
val - 1);
}
if (gpr_unref(&ep->ref)) {
- destroy(ep);
+ destroy(exec_ctx, ep);
}
}
@@ -106,11 +107,13 @@ static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
gpr_ref(&ep->ref);
}
#else
-#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
+#define SECURE_ENDPOINT_UNREF(exec_ctx, ep, reason) \
+ secure_endpoint_unref((exec_ctx), (ep))
#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
-static void secure_endpoint_unref(secure_endpoint* ep) {
+static void secure_endpoint_unref(grpc_exec_ctx* exec_ctx,
+ secure_endpoint* ep) {
if (gpr_unref(&ep->ref)) {
- destroy(ep);
+ destroy(exec_ctx, ep);
}
}
@@ -125,7 +128,8 @@ static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
*end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
}
-static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
+static void call_read_cb(grpc_exec_ctx* exec_ctx, secure_endpoint* ep,
+ grpc_error* error) {
if (grpc_trace_secure_endpoint.enabled()) {
size_t i;
for (i = 0; i < ep->read_buffer->count; i++) {
@@ -136,11 +140,12 @@ static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
}
}
ep->read_buffer = nullptr;
- GRPC_CLOSURE_SCHED(ep->read_cb, error);
- SECURE_ENDPOINT_UNREF(ep, "read");
+ GRPC_CLOSURE_SCHED(exec_ctx, ep->read_cb, error);
+ SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read");
}
-static void on_read(void* user_data, grpc_error* error) {
+static void on_read(grpc_exec_ctx* exec_ctx, void* user_data,
+ grpc_error* error) {
unsigned i;
uint8_t keep_looping = 0;
tsi_result result = TSI_OK;
@@ -149,16 +154,17 @@ static void on_read(void* user_data, grpc_error* error) {
uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
- call_read_cb(ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Secure read failed", &error, 1));
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer);
+ call_read_cb(exec_ctx, ep,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Secure read failed", &error, 1));
return;
}
if (ep->zero_copy_protector != nullptr) {
// Use zero-copy grpc protector to unprotect.
result = tsi_zero_copy_grpc_protector_unprotect(
- ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
+ exec_ctx, ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
} else {
// Use frame protector to unprotect.
/* TODO(yangg) check error, maybe bail out early */
@@ -211,35 +217,37 @@ static void on_read(void* user_data, grpc_error* error) {
/* TODO(yangg) experiment with moving this block after read_cb to see if it
helps latency */
- grpc_slice_buffer_reset_and_unref_internal(&ep->source_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->source_buffer);
if (result != TSI_OK) {
- grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer);
call_read_cb(
- ep, grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result));
+ exec_ctx, ep,
+ grpc_set_tsi_error_result(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result));
return;
}
- call_read_cb(ep, GRPC_ERROR_NONE);
+ call_read_cb(exec_ctx, ep, GRPC_ERROR_NONE);
}
-static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
secure_endpoint* ep = (secure_endpoint*)secure_ep;
ep->read_cb = cb;
ep->read_buffer = slices;
- grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer);
SECURE_ENDPOINT_REF(ep, "read");
if (ep->leftover_bytes.count) {
grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
GPR_ASSERT(ep->leftover_bytes.count == 0);
- on_read(ep, GRPC_ERROR_NONE);
+ on_read(exec_ctx, ep, GRPC_ERROR_NONE);
return;
}
- grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read);
+ grpc_endpoint_read(exec_ctx, ep->wrapped_ep, &ep->source_buffer,
+ &ep->on_read);
}
static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
@@ -250,8 +258,8 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
*end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
}
-static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
GPR_TIMER_BEGIN("secure_endpoint.endpoint_write", 0);
unsigned i;
@@ -260,7 +268,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
- grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer);
if (grpc_trace_secure_endpoint.enabled()) {
for (i = 0; i < slices->count; i++) {
@@ -273,8 +281,8 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
if (ep->zero_copy_protector != nullptr) {
// Use zero-copy grpc protector to protect.
- result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector,
- slices, &ep->output_buffer);
+ result = tsi_zero_copy_grpc_protector_protect(
+ exec_ctx, ep->zero_copy_protector, slices, &ep->output_buffer);
} else {
// Use frame protector to protect.
for (i = 0; i < slices->count; i++) {
@@ -332,44 +340,50 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
- grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer);
GRPC_CLOSURE_SCHED(
- cb, grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
+ exec_ctx, cb,
+ grpc_set_tsi_error_result(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
return;
}
- grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
+ grpc_endpoint_write(exec_ctx, ep->wrapped_ep, &ep->output_buffer, cb);
GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
}
-static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
+static void endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep,
+ grpc_error* why) {
secure_endpoint* ep = (secure_endpoint*)secure_ep;
- grpc_endpoint_shutdown(ep->wrapped_ep, why);
+ grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep, why);
}
-static void endpoint_destroy(grpc_endpoint* secure_ep) {
+static void endpoint_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* secure_ep) {
secure_endpoint* ep = (secure_endpoint*)secure_ep;
- SECURE_ENDPOINT_UNREF(ep, "destroy");
+ SECURE_ENDPOINT_UNREF(exec_ctx, ep, "destroy");
}
-static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
+static void endpoint_add_to_pollset(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* secure_ep,
grpc_pollset* pollset) {
secure_endpoint* ep = (secure_endpoint*)secure_ep;
- grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
+ grpc_endpoint_add_to_pollset(exec_ctx, ep->wrapped_ep, pollset);
}
-static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
+static void endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* secure_ep,
grpc_pollset_set* pollset_set) {
secure_endpoint* ep = (secure_endpoint*)secure_ep;
- grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set);
}
-static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
+static void endpoint_delete_from_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* secure_ep,
grpc_pollset_set* pollset_set) {
secure_endpoint* ep = (secure_endpoint*)secure_ep;
- grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
+ grpc_endpoint_delete_from_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set);
}
static char* endpoint_get_peer(grpc_endpoint* secure_ep) {