diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 49 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 6 | ||||
-rw-r--r-- | src/core/surface/server.c | 12 |
3 files changed, 47 insertions, 20 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dd8eaa943e..eda7e1f6fd 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -150,6 +150,8 @@ struct grpc_call { gpr_uint8 receiving; /* are we currently completing requests */ gpr_uint8 completing; + /** has grpc_call_destroy been called */ + gpr_uint8 destroy_called; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; /* are we currently reading a message? */ @@ -240,6 +242,9 @@ struct grpc_call { gpr_uint32 incoming_message_length; gpr_uint32 incoming_message_flags; grpc_iomgr_closure destroy_closure; + grpc_iomgr_closure on_done_recv; + grpc_iomgr_closure on_done_send; + grpc_iomgr_closure on_done_bind; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -258,6 +263,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, const char *description); +static void finished_loose_op(void *call, int success); static void lock(grpc_call *call); static void unlock(grpc_call *call); @@ -301,16 +307,18 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, grpc_sopb_init(&call->send_ops); grpc_sopb_init(&call->recv_ops); gpr_slice_buffer_init(&call->incoming_message); - /* dropped in destroy */ - gpr_ref_init(&call->internal_refcount, 1); + grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call); + grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call); + grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call); + /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */ + gpr_ref_init(&call->internal_refcount, 2); /* server hack: start reads immediately so we can get initial metadata. TODO(ctiller): figure out a cleaner solution */ if (!call->is_client) { memset(&initial_op, 0, sizeof(initial_op)); initial_op.recv_ops = &call->recv_ops; initial_op.recv_state = &call->recv_state; - initial_op.on_done_recv = call_on_done_recv; - initial_op.recv_user_data = call; + initial_op.on_done_recv = &call->on_done_recv; initial_op.context = call->context; call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); @@ -450,7 +458,8 @@ static int need_more_data(grpc_call *call) { (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK); + (call->cancel_with_status != GRPC_STATUS_OK) || + call->destroy_called; } static void unlock(grpc_call *call) { @@ -469,8 +478,7 @@ static void unlock(grpc_call *call) { if (!call->receiving && need_more_data(call)) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; - op.on_done_recv = call_on_done_recv; - op.recv_user_data = call; + op.on_done_recv = &call->on_done_recv; call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; @@ -797,6 +805,7 @@ static void call_on_done_recv(void *pc, int success) { grpc_alarm_cancel(&call->alarm); call->have_alarm = 0; } + GRPC_CALL_INTERNAL_UNREF(call, "closed", 0); } finish_read_ops(call); } else { @@ -940,8 +949,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { break; } if (op->send_ops) { - op->on_done_send = call_on_done_send; - op->send_user_data = call; + op->on_done_send = &call->on_done_send; } return op->send_ops != NULL; } @@ -1071,6 +1079,8 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( void grpc_call_destroy(grpc_call *c) { int cancel; lock(c); + GPR_ASSERT(!c->destroy_called); + c->destroy_called = 1; if (c->have_alarm) { grpc_alarm_cancel(&c->alarm); c->have_alarm = 0; @@ -1115,14 +1125,31 @@ static void finished_loose_op(void *call, int success_ignored) { GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0); } +typedef struct { + grpc_call *call; + grpc_iomgr_closure closure; +} finished_loose_op_allocated_args; + +static void finished_loose_op_allocated(void *alloc, int success) { + finished_loose_op_allocated_args *args = alloc; + finished_loose_op(args->call, success); + gpr_free(args); +} + static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; GPR_ASSERT(op->on_consumed == NULL); if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) { GRPC_CALL_INTERNAL_REF(call, "loose-op"); - op->on_consumed = finished_loose_op; - op->on_consumed_user_data = call; + if (op->bind_pollset) { + op->on_consumed = &call->on_done_bind; + } else { + finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); + args->call = call; + grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args); + op->on_consumed = &args->closure; + } } elem = CALL_ELEM_FROM_CALL(call, 0); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b667128aef..85e1ab5554 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -56,7 +56,7 @@ static void lame_start_transport_op(grpc_call_element *elem, GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->send_ops) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send(op->send_user_data, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0); } if (op->recv_ops) { char tmp[GPR_LTOA_MIN_BUFSIZE]; @@ -75,10 +75,10 @@ static void lame_start_transport_op(grpc_call_element *elem, mdb.deadline = gpr_inf_future; grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv(op->recv_user_data, 1); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); } if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 546b17c1ff..13ec5bee94 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -191,9 +191,9 @@ struct call_data { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; + grpc_iomgr_closure *on_done_recv; + grpc_iomgr_closure server_on_recv; grpc_iomgr_closure kill_zombie_closure; call_data **root[CALL_LIST_COUNT]; @@ -523,7 +523,7 @@ static void server_on_recv(void *ptr, int success) { break; } - calld->on_done_recv(calld->recv_user_data, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { @@ -534,9 +534,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { calld->recv_ops = op->recv_ops; calld->recv_state = op->recv_state; calld->on_done_recv = op->on_done_recv; - calld->recv_user_data = op->recv_user_data; - op->on_done_recv = server_on_recv; - op->recv_user_data = elem; + op->on_done_recv = &calld->server_on_recv; } } @@ -632,6 +630,8 @@ static void init_call_elem(grpc_call_element *elem, calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); + grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); + gpr_mu_lock(&chand->server->mu_call); call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); gpr_mu_unlock(&chand->server->mu_call); |