diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 82 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 16 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 12 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 6 | ||||
-rw-r--r-- | src/core/surface/server.c | 12 |
5 files changed, 83 insertions, 45 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5f489c0f4e..bf6fe73775 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -151,12 +151,17 @@ struct grpc_call { gpr_uint8 receiving; /* are we currently completing requests */ gpr_uint8 completing; + /** has grpc_call_destroy been called */ + gpr_uint8 destroy_called; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; /* are we currently reading a message? */ gpr_uint8 reading_message; /* have we bound a pollset yet? */ gpr_uint8 bound_pollset; + /* is an error status set */ + gpr_uint8 error_status_set; + /* flags with bits corresponding to write states allowing us to determine what was sent */ gpr_uint16 last_send_contains; @@ -215,7 +220,7 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /** Compression algorithm for the call */ + /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; /* Contexts for various subsystems (security, tracing, ...). */ @@ -241,6 +246,9 @@ struct grpc_call { gpr_uint32 incoming_message_length; gpr_uint32 incoming_message_flags; grpc_iomgr_closure destroy_closure; + grpc_iomgr_closure on_done_recv; + grpc_iomgr_closure on_done_send; + grpc_iomgr_closure on_done_bind; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -259,6 +267,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, const char *description); +static void finished_loose_op(void *call, int success); static void lock(grpc_call *call); static void unlock(grpc_call *call); @@ -302,16 +311,18 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, grpc_sopb_init(&call->send_ops); grpc_sopb_init(&call->recv_ops); gpr_slice_buffer_init(&call->incoming_message); - /* dropped in destroy */ - gpr_ref_init(&call->internal_refcount, 1); + grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call); + grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call); + grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call); + /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */ + gpr_ref_init(&call->internal_refcount, 2); /* server hack: start reads immediately so we can get initial metadata. TODO(ctiller): figure out a cleaner solution */ if (!call->is_client) { memset(&initial_op, 0, sizeof(initial_op)); initial_op.recv_ops = &call->recv_ops; initial_op.recv_state = &call->recv_state; - initial_op.on_done_recv = call_on_done_recv; - initial_op.recv_user_data = call; + initial_op.on_done_recv = &call->on_done_recv; initial_op.context = call->context; call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); @@ -410,6 +421,7 @@ static void set_status_code(grpc_call *call, status_source source, call->status[source].is_set = 1; call->status[source].code = status; + call->error_status_set = status != GRPC_STATUS_OK; if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) { grpc_bbq_flush(&call->incoming_queue); @@ -451,7 +463,8 @@ static int need_more_data(grpc_call *call) { (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK); + (call->cancel_with_status != GRPC_STATUS_OK) || + call->destroy_called; } static void unlock(grpc_call *call) { @@ -470,8 +483,7 @@ static void unlock(grpc_call *call) { if (!call->receiving && need_more_data(call)) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; - op.on_done_recv = call_on_done_recv; - op.recv_user_data = call; + op.on_done_recv = &call->on_done_recv; call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; @@ -687,22 +699,21 @@ static void call_on_done_send(void *pc, int success) { } static void finish_message(grpc_call *call) { - /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer; - /* some aliases for readability */ - gpr_slice *slices = call->incoming_message.slices; - const size_t nslices = call->incoming_message.count; - - if (call->compression_algorithm > GRPC_COMPRESS_NONE) { - byte_buffer = grpc_raw_compressed_byte_buffer_create( - slices, nslices, call->compression_algorithm); - } else { - byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); + if (call->error_status_set == 0) { + /* TODO(ctiller): this could be a lot faster if coded directly */ + grpc_byte_buffer *byte_buffer; + /* some aliases for readability */ + gpr_slice *slices = call->incoming_message.slices; + const size_t nslices = call->incoming_message.count; + if (call->compression_algorithm > GRPC_COMPRESS_NONE) { + byte_buffer = grpc_raw_compressed_byte_buffer_create( + slices, nslices, call->compression_algorithm); + } else { + byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); + } + grpc_bbq_push(&call->incoming_queue, byte_buffer); } gpr_slice_buffer_reset_and_unref(&call->incoming_message); - - grpc_bbq_push(&call->incoming_queue, byte_buffer); - GPR_ASSERT(call->incoming_message.count == 0); call->reading_message = 0; } @@ -823,6 +834,7 @@ static void call_on_done_recv(void *pc, int success) { grpc_alarm_cancel(&call->alarm); call->have_alarm = 0; } + GRPC_CALL_INTERNAL_UNREF(call, "closed", 0); } finish_read_ops(call); } else { @@ -966,8 +978,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { break; } if (op->send_ops) { - op->on_done_send = call_on_done_send; - op->send_user_data = call; + op->on_done_send = &call->on_done_send; } return op->send_ops != NULL; } @@ -1097,6 +1108,8 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( void grpc_call_destroy(grpc_call *c) { int cancel; lock(c); + GPR_ASSERT(!c->destroy_called); + c->destroy_called = 1; if (c->have_alarm) { grpc_alarm_cancel(&c->alarm); c->have_alarm = 0; @@ -1141,14 +1154,31 @@ static void finished_loose_op(void *call, int success_ignored) { GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0); } +typedef struct { + grpc_call *call; + grpc_iomgr_closure closure; +} finished_loose_op_allocated_args; + +static void finished_loose_op_allocated(void *alloc, int success) { + finished_loose_op_allocated_args *args = alloc; + finished_loose_op(args->call, success); + gpr_free(args); +} + static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; GPR_ASSERT(op->on_consumed == NULL); if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) { GRPC_CALL_INTERNAL_REF(call, "loose-op"); - op->on_consumed = finished_loose_op; - op->on_consumed_user_data = call; + if (op->bind_pollset) { + op->on_consumed = &call->on_done_bind; + } else { + finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); + args->call = call; + grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args); + op->on_consumed = &args->closure; + } } elem = CALL_ELEM_FROM_CALL(call, 0); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index bd0fabf9da..030a8b4e6f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -88,9 +88,11 @@ grpc_completion_queue *grpc_completion_queue_create(void) { } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason) { - gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count, - (int)cc->owning_refs.count + 1, reason); +void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", + cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, + reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { #endif @@ -103,9 +105,11 @@ static void on_pollset_destroy_done(void *arg) { } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason) { - gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, - (int)cc->owning_refs.count - 1, reason); +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", + cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, + reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index e76910c00b..1b9010f462 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -40,10 +40,14 @@ #include <grpc/grpc.h> #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason); -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason); -#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc, reason) -#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc, reason) +void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, + const char *file, int line); +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, + const char *file, int line); +#define GRPC_CQ_INTERNAL_REF(cc, reason) \ + grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__) +#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \ + grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__) #else void grpc_cq_internal_ref(grpc_completion_queue *cc); void grpc_cq_internal_unref(grpc_completion_queue *cc); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b667128aef..85e1ab5554 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -56,7 +56,7 @@ static void lame_start_transport_op(grpc_call_element *elem, GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->send_ops) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send(op->send_user_data, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0); } if (op->recv_ops) { char tmp[GPR_LTOA_MIN_BUFSIZE]; @@ -75,10 +75,10 @@ static void lame_start_transport_op(grpc_call_element *elem, mdb.deadline = gpr_inf_future; grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv(op->recv_user_data, 1); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); } if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 076301777b..278ea95488 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -191,9 +191,9 @@ struct call_data { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; + grpc_iomgr_closure *on_done_recv; + grpc_iomgr_closure server_on_recv; grpc_iomgr_closure kill_zombie_closure; call_data **root[CALL_LIST_COUNT]; @@ -523,7 +523,7 @@ static void server_on_recv(void *ptr, int success) { break; } - calld->on_done_recv(calld->recv_user_data, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { @@ -534,9 +534,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { calld->recv_ops = op->recv_ops; calld->recv_state = op->recv_state; calld->on_done_recv = op->on_done_recv; - calld->recv_user_data = op->recv_user_data; - op->on_done_recv = server_on_recv; - op->recv_user_data = elem; + op->on_done_recv = &calld->server_on_recv; } } @@ -632,6 +630,8 @@ static void init_call_elem(grpc_call_element *elem, calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); + grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); + gpr_mu_lock(&chand->server->mu_call); call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); gpr_mu_unlock(&chand->server->mu_call); |