diff options
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r-- | src/core/lib/surface/call.c | 76 |
1 files changed, 60 insertions, 16 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 97d50a91be..7525806583 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -160,6 +160,7 @@ typedef struct { } child_call; struct grpc_call { + gpr_refcount ext_ref; gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; @@ -170,7 +171,7 @@ struct grpc_call { /* client or server call */ bool is_client; - /** has grpc_call_destroy been called */ + /** has grpc_call_unref been called */ bool destroy_called; /** flag indicating that cancellation is inherited */ bool cancellation_is_inherited; @@ -244,6 +245,7 @@ struct grpc_call { }; int grpc_call_error_trace = 0; +int grpc_compression_trace = 0; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) @@ -282,6 +284,10 @@ static void add_init_error(grpc_error **composite, grpc_error *new) { *composite = grpc_error_add_child(*composite, new); } +void *grpc_call_arena_alloc(grpc_call *call, size_t size) { + return gpr_arena_alloc(call->arena, size); +} + static parent_call *get_or_create_parent_call(grpc_call *call) { parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); if (p == NULL) { @@ -312,6 +318,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel)); call = gpr_arena_alloc(arena, sizeof(grpc_call) + channel_stack->call_stack_size); + gpr_ref_init(&call->ext_ref, 1); call->arena = arena; *out_call = call; call->channel = args->channel; @@ -346,6 +353,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_timespec send_deadline = gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); + bool immediately_cancel = false; + if (args->parent_call != NULL) { child_call *cc = call->child_call = gpr_arena_alloc(arena, sizeof(child_call)); @@ -386,8 +395,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { - cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); + immediately_cancel = true; } } @@ -407,7 +415,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, call->send_deadline = send_deadline; GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); - /* initial refcount dropped by grpc_call_destroy */ + /* initial refcount dropped by grpc_call_unref */ grpc_call_element_args call_args = { .call_stack = CALL_STACK_FROM_CALL(call), .server_transport_data = args->server_transport_data, @@ -422,6 +430,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } + if (immediately_cancel) { + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + } if (args->cq != NULL) { GPR_ASSERT( args->pollset_set_alternative == NULL && @@ -528,12 +540,16 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_END("destroy_call", 0); } -void grpc_call_destroy(grpc_call *c) { +void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); } + +void grpc_call_unref(grpc_call *c) { + if (!gpr_unref(&c->ext_ref)) return; + child_call *cc = c->child_call; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_call_destroy", 0); - GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); + GPR_TIMER_BEGIN("grpc_call_unref", 0); + GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); if (cc) { parent_call *pc = get_parent_call(cc->parent); @@ -560,7 +576,7 @@ void grpc_call_destroy(grpc_call *c) { } GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); grpc_exec_ctx_finish(&exec_ctx); - GPR_TIMER_END("grpc_call_destroy", 0); + GPR_TIMER_END("grpc_call_unref", 0); } grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { @@ -1187,6 +1203,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, batch_control *bctl) { + grpc_error *error; grpc_call *call = bctl->call; for (;;) { size_t remaining = call->receiving_stream->length - @@ -1198,11 +1215,22 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, finish_batch_step(exec_ctx, bctl); return; } - if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, - &call->receiving_slice, remaining, + if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, &call->receiving_slice_ready)) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); + error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream, + &call->receiving_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + } else { + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + grpc_byte_buffer_destroy(*call->receiving_buffer); + *call->receiving_buffer = NULL; + call->receiving_message = 0; + finish_batch_step(exec_ctx, bctl); + return; + } } else { return; } @@ -1213,12 +1241,24 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; + grpc_byte_stream *bs = call->receiving_stream; + bool release_error = false; if (error == GRPC_ERROR_NONE) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); - continue_receiving_slices(exec_ctx, bctl); - } else { + grpc_slice slice; + error = grpc_byte_stream_pull(exec_ctx, bs, &slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + slice); + continue_receiving_slices(exec_ctx, bctl); + } else { + /* Error returned by grpc_byte_stream_pull needs to be released manually + */ + release_error = true; + } + } + + if (error != GRPC_ERROR_NONE) { if (grpc_trace_operation_failures) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } @@ -1226,7 +1266,11 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; + call->receiving_message = 0; finish_batch_step(exec_ctx, bctl); + if (release_error) { + GRPC_ERROR_UNREF(error); + } } } |