aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/security/secure_endpoint.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/security/secure_endpoint.c')
-rw-r--r--src/core/security/secure_endpoint.c92
1 files changed, 40 insertions, 52 deletions
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index dcafc0fc0f..a501544341 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -67,9 +67,9 @@ typedef struct {
int grpc_trace_secure_endpoint = 0;
-static void destroy(secure_endpoint *secure_ep) {
+static void destroy(secure_endpoint *secure_ep, grpc_call_list *call_list) {
secure_endpoint *ep = secure_ep;
- grpc_endpoint_destroy(ep->wrapped_ep);
+ grpc_endpoint_destroy(ep->wrapped_ep, call_list);
tsi_frame_protector_destroy(ep->protector);
gpr_slice_buffer_destroy(&ep->leftover_bytes);
gpr_slice_unref(ep->read_staging_buffer);
@@ -102,11 +102,12 @@ static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
gpr_ref(&ep->ref);
}
#else
-#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
+#define SECURE_ENDPOINT_UNREF(ep, reason, cl) secure_endpoint_unref((ep), (cl))
#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
-static void secure_endpoint_unref(secure_endpoint *ep) {
+static void secure_endpoint_unref(secure_endpoint *ep,
+ grpc_call_list *call_list) {
if (gpr_unref(&ep->ref)) {
- destroy(ep);
+ destroy(ep, call_list);
}
}
@@ -121,7 +122,8 @@ static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
*end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
}
-static void call_read_cb(secure_endpoint *ep, int success) {
+static void call_read_cb(secure_endpoint *ep, int success,
+ grpc_call_list *call_list) {
if (grpc_trace_secure_endpoint) {
size_t i;
for (i = 0; i < ep->read_buffer->count; i++) {
@@ -132,11 +134,11 @@ static void call_read_cb(secure_endpoint *ep, int success) {
}
}
ep->read_buffer = NULL;
- ep->read_cb->cb(ep->read_cb->cb_arg, success);
- SECURE_ENDPOINT_UNREF(ep, "read");
+ grpc_call_list_add(call_list, ep->read_cb, success);
+ SECURE_ENDPOINT_UNREF(ep, "read", call_list);
}
-static int on_read(void *user_data, int success) {
+static void on_read(void *user_data, int success, grpc_call_list *call_list) {
unsigned i;
gpr_uint8 keep_looping = 0;
tsi_result result = TSI_OK;
@@ -146,7 +148,8 @@ static int on_read(void *user_data, int success) {
if (!success) {
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
- return 0;
+ call_read_cb(ep, 0, call_list);
+ return;
}
/* TODO(yangg) check error, maybe bail out early */
@@ -202,21 +205,16 @@ static int on_read(void *user_data, int success) {
if (result != TSI_OK) {
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
- return 0;
+ call_read_cb(ep, 0, call_list);
+ return;
}
- return 1;
-}
-
-static void on_read_cb(void *user_data, int success) {
- call_read_cb(user_data, on_read(user_data, success));
+ call_read_cb(ep, 1, call_list);
}
-static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
- gpr_slice_buffer *slices,
- grpc_closure *cb) {
+static void endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices,
+ grpc_closure *cb, grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- int immediate_read_success = -1;
ep->read_cb = cb;
ep->read_buffer = slices;
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
@@ -224,27 +222,13 @@ static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
if (ep->leftover_bytes.count) {
gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
GPR_ASSERT(ep->leftover_bytes.count == 0);
- return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ on_read(ep, 1, call_list);
+ return;
}
SECURE_ENDPOINT_REF(ep, "read");
-
- switch (
- grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) {
- case GRPC_ENDPOINT_DONE:
- immediate_read_success = on_read(ep, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- return GRPC_ENDPOINT_PENDING;
- case GRPC_ENDPOINT_ERROR:
- immediate_read_success = on_read(ep, 0);
- break;
- }
-
- GPR_ASSERT(immediate_read_success != -1);
- SECURE_ENDPOINT_UNREF(ep, "read");
-
- return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read,
+ call_list);
}
static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -255,9 +239,8 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
*end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
}
-static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
- gpr_slice_buffer *slices,
- grpc_closure *cb) {
+static void endpoint_write(grpc_endpoint *secure_ep, gpr_slice_buffer *slices,
+ grpc_closure *cb, grpc_call_list *call_list) {
unsigned i;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep;
@@ -329,32 +312,37 @@ static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- return GRPC_ENDPOINT_ERROR;
+ grpc_call_list_add(call_list, cb, 0);
+ return;
}
- return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
+ grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, call_list);
}
-static void endpoint_shutdown(grpc_endpoint *secure_ep) {
+static void endpoint_shutdown(grpc_endpoint *secure_ep,
+ grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- grpc_endpoint_shutdown(ep->wrapped_ep);
+ grpc_endpoint_shutdown(ep->wrapped_ep, call_list);
}
-static void endpoint_destroy(grpc_endpoint *secure_ep) {
+static void endpoint_destroy(grpc_endpoint *secure_ep,
+ grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- SECURE_ENDPOINT_UNREF(ep, "destroy");
+ SECURE_ENDPOINT_UNREF(ep, "destroy", call_list);
}
static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset,
+ grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
+ grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset, call_list);
}
static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep,
- grpc_pollset_set *pollset_set) {
+ grpc_pollset_set *pollset_set,
+ grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
+ grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set, call_list);
}
static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
@@ -386,7 +374,7 @@ grpc_endpoint *grpc_secure_endpoint_create(
gpr_slice_buffer_init(&ep->output_buffer);
gpr_slice_buffer_init(&ep->source_buffer);
ep->read_buffer = NULL;
- grpc_closure_init(&ep->on_read, on_read_cb, ep);
+ grpc_closure_init(&ep->on_read, on_read, ep);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;