diff options
Diffstat (limited to 'src/core/security/secure_endpoint.c')
-rw-r--r-- | src/core/security/secure_endpoint.c | 92 |
1 files changed, 40 insertions, 52 deletions
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index dcafc0fc0f..a501544341 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -67,9 +67,9 @@ typedef struct { int grpc_trace_secure_endpoint = 0; -static void destroy(secure_endpoint *secure_ep) { +static void destroy(secure_endpoint *secure_ep, grpc_call_list *call_list) { secure_endpoint *ep = secure_ep; - grpc_endpoint_destroy(ep->wrapped_ep); + grpc_endpoint_destroy(ep->wrapped_ep, call_list); tsi_frame_protector_destroy(ep->protector); gpr_slice_buffer_destroy(&ep->leftover_bytes); gpr_slice_unref(ep->read_staging_buffer); @@ -102,11 +102,12 @@ static void secure_endpoint_ref(secure_endpoint *ep, const char *reason, gpr_ref(&ep->ref); } #else -#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep)) +#define SECURE_ENDPOINT_UNREF(ep, reason, cl) secure_endpoint_unref((ep), (cl)) #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep)) -static void secure_endpoint_unref(secure_endpoint *ep) { +static void secure_endpoint_unref(secure_endpoint *ep, + grpc_call_list *call_list) { if (gpr_unref(&ep->ref)) { - destroy(ep); + destroy(ep, call_list); } } @@ -121,7 +122,8 @@ static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); } -static void call_read_cb(secure_endpoint *ep, int success) { +static void call_read_cb(secure_endpoint *ep, int success, + grpc_call_list *call_list) { if (grpc_trace_secure_endpoint) { size_t i; for (i = 0; i < ep->read_buffer->count; i++) { @@ -132,11 +134,11 @@ static void call_read_cb(secure_endpoint *ep, int success) { } } ep->read_buffer = NULL; - ep->read_cb->cb(ep->read_cb->cb_arg, success); - SECURE_ENDPOINT_UNREF(ep, "read"); + grpc_call_list_add(call_list, ep->read_cb, success); + SECURE_ENDPOINT_UNREF(ep, "read", call_list); } -static int on_read(void *user_data, int success) { +static void on_read(void *user_data, int success, grpc_call_list *call_list) { unsigned i; gpr_uint8 keep_looping = 0; tsi_result result = TSI_OK; @@ -146,7 +148,8 @@ static int on_read(void *user_data, int success) { if (!success) { gpr_slice_buffer_reset_and_unref(ep->read_buffer); - return 0; + call_read_cb(ep, 0, call_list); + return; } /* TODO(yangg) check error, maybe bail out early */ @@ -202,21 +205,16 @@ static int on_read(void *user_data, int success) { if (result != TSI_OK) { gpr_slice_buffer_reset_and_unref(ep->read_buffer); - return 0; + call_read_cb(ep, 0, call_list); + return; } - return 1; -} - -static void on_read_cb(void *user_data, int success) { - call_read_cb(user_data, on_read(user_data, success)); + call_read_cb(ep, 1, call_list); } -static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep, - gpr_slice_buffer *slices, - grpc_closure *cb) { +static void endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices, + grpc_closure *cb, grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - int immediate_read_success = -1; ep->read_cb = cb; ep->read_buffer = slices; gpr_slice_buffer_reset_and_unref(ep->read_buffer); @@ -224,27 +222,13 @@ static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep, if (ep->leftover_bytes.count) { gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer); GPR_ASSERT(ep->leftover_bytes.count == 0); - return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; + on_read(ep, 1, call_list); + return; } SECURE_ENDPOINT_REF(ep, "read"); - - switch ( - grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) { - case GRPC_ENDPOINT_DONE: - immediate_read_success = on_read(ep, 1); - break; - case GRPC_ENDPOINT_PENDING: - return GRPC_ENDPOINT_PENDING; - case GRPC_ENDPOINT_ERROR: - immediate_read_success = on_read(ep, 0); - break; - } - - GPR_ASSERT(immediate_read_success != -1); - SECURE_ENDPOINT_UNREF(ep, "read"); - - return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; + grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, + call_list); } static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, @@ -255,9 +239,8 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); } -static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep, - gpr_slice_buffer *slices, - grpc_closure *cb) { +static void endpoint_write(grpc_endpoint *secure_ep, gpr_slice_buffer *slices, + grpc_closure *cb, grpc_call_list *call_list) { unsigned i; tsi_result result = TSI_OK; secure_endpoint *ep = (secure_endpoint *)secure_ep; @@ -329,32 +312,37 @@ static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - return GRPC_ENDPOINT_ERROR; + grpc_call_list_add(call_list, cb, 0); + return; } - return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); + grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, call_list); } -static void endpoint_shutdown(grpc_endpoint *secure_ep) { +static void endpoint_shutdown(grpc_endpoint *secure_ep, + grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - grpc_endpoint_shutdown(ep->wrapped_ep); + grpc_endpoint_shutdown(ep->wrapped_ep, call_list); } -static void endpoint_destroy(grpc_endpoint *secure_ep) { +static void endpoint_destroy(grpc_endpoint *secure_ep, + grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - SECURE_ENDPOINT_UNREF(ep, "destroy"); + SECURE_ENDPOINT_UNREF(ep, "destroy", call_list); } static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, - grpc_pollset *pollset) { + grpc_pollset *pollset, + grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset); + grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset, call_list); } static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set); + grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set, call_list); } static char *endpoint_get_peer(grpc_endpoint *secure_ep) { @@ -386,7 +374,7 @@ grpc_endpoint *grpc_secure_endpoint_create( gpr_slice_buffer_init(&ep->output_buffer); gpr_slice_buffer_init(&ep->source_buffer); ep->read_buffer = NULL; - grpc_closure_init(&ep->on_read, on_read_cb, ep); + grpc_closure_init(&ep->on_read, on_read, ep); gpr_mu_init(&ep->protector_mu); gpr_ref_init(&ep->ref, 1); return &ep->base; |