aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c49
-rw-r--r--src/core/surface/lame_client.c6
-rw-r--r--src/core/surface/server.c12
3 files changed, 47 insertions, 20 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index dd8eaa943e..eda7e1f6fd 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -150,6 +150,8 @@ struct grpc_call {
gpr_uint8 receiving;
/* are we currently completing requests */
gpr_uint8 completing;
+ /** has grpc_call_destroy been called */
+ gpr_uint8 destroy_called;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
/* are we currently reading a message? */
@@ -240,6 +242,9 @@ struct grpc_call {
gpr_uint32 incoming_message_length;
gpr_uint32 incoming_message_flags;
grpc_iomgr_closure destroy_closure;
+ grpc_iomgr_closure on_done_recv;
+ grpc_iomgr_closure on_done_send;
+ grpc_iomgr_closure on_done_bind;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -258,6 +263,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description);
+static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
@@ -301,16 +307,18 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message);
- /* dropped in destroy */
- gpr_ref_init(&call->internal_refcount, 1);
+ grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
+ grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
+ grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
+ /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
+ gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata.
TODO(ctiller): figure out a cleaner solution */
if (!call->is_client) {
memset(&initial_op, 0, sizeof(initial_op));
initial_op.recv_ops = &call->recv_ops;
initial_op.recv_state = &call->recv_state;
- initial_op.on_done_recv = call_on_done_recv;
- initial_op.recv_user_data = call;
+ initial_op.on_done_recv = &call->on_done_recv;
initial_op.context = call->context;
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
@@ -450,7 +458,8 @@ static int need_more_data(grpc_call *call) {
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
- (call->cancel_with_status != GRPC_STATUS_OK);
+ (call->cancel_with_status != GRPC_STATUS_OK) ||
+ call->destroy_called;
}
static void unlock(grpc_call *call) {
@@ -469,8 +478,7 @@ static void unlock(grpc_call *call) {
if (!call->receiving && need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
- op.on_done_recv = call_on_done_recv;
- op.recv_user_data = call;
+ op.on_done_recv = &call->on_done_recv;
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;
@@ -797,6 +805,7 @@ static void call_on_done_recv(void *pc, int success) {
grpc_alarm_cancel(&call->alarm);
call->have_alarm = 0;
}
+ GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
}
finish_read_ops(call);
} else {
@@ -940,8 +949,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
break;
}
if (op->send_ops) {
- op->on_done_send = call_on_done_send;
- op->send_user_data = call;
+ op->on_done_send = &call->on_done_send;
}
return op->send_ops != NULL;
}
@@ -1071,6 +1079,8 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
void grpc_call_destroy(grpc_call *c) {
int cancel;
lock(c);
+ GPR_ASSERT(!c->destroy_called);
+ c->destroy_called = 1;
if (c->have_alarm) {
grpc_alarm_cancel(&c->alarm);
c->have_alarm = 0;
@@ -1115,14 +1125,31 @@ static void finished_loose_op(void *call, int success_ignored) {
GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
}
+typedef struct {
+ grpc_call *call;
+ grpc_iomgr_closure closure;
+} finished_loose_op_allocated_args;
+
+static void finished_loose_op_allocated(void *alloc, int success) {
+ finished_loose_op_allocated_args *args = alloc;
+ finished_loose_op(args->call, success);
+ gpr_free(args);
+}
+
static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);
if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
GRPC_CALL_INTERNAL_REF(call, "loose-op");
- op->on_consumed = finished_loose_op;
- op->on_consumed_user_data = call;
+ if (op->bind_pollset) {
+ op->on_consumed = &call->on_done_bind;
+ } else {
+ finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
+ args->call = call;
+ grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args);
+ op->on_consumed = &args->closure;
+ }
}
elem = CALL_ELEM_FROM_CALL(call, 0);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index b667128aef..85e1ab5554 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -56,7 +56,7 @@ static void lame_start_transport_op(grpc_call_element *elem,
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->send_ops) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send(op->send_user_data, 0);
+ op->on_done_send->cb(op->on_done_send->cb_arg, 0);
}
if (op->recv_ops) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
@@ -75,10 +75,10 @@ static void lame_start_transport_op(grpc_call_element *elem,
mdb.deadline = gpr_inf_future;
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv(op->recv_user_data, 1);
+ op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
}
if (op->on_consumed) {
- op->on_consumed(op->on_consumed_user_data, 0);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 546b17c1ff..13ec5bee94 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -191,9 +191,9 @@ struct call_data {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
- void (*on_done_recv)(void *user_data, int success);
- void *recv_user_data;
+ grpc_iomgr_closure *on_done_recv;
+ grpc_iomgr_closure server_on_recv;
grpc_iomgr_closure kill_zombie_closure;
call_data **root[CALL_LIST_COUNT];
@@ -523,7 +523,7 @@ static void server_on_recv(void *ptr, int success) {
break;
}
- calld->on_done_recv(calld->recv_user_data, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
@@ -534,9 +534,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
calld->recv_ops = op->recv_ops;
calld->recv_state = op->recv_state;
calld->on_done_recv = op->on_done_recv;
- calld->recv_user_data = op->recv_user_data;
- op->on_done_recv = server_on_recv;
- op->recv_user_data = elem;
+ op->on_done_recv = &calld->server_on_recv;
}
}
@@ -632,6 +630,8 @@ static void init_call_elem(grpc_call_element *elem,
calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem);
+ grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
+
gpr_mu_lock(&chand->server->mu_call);
call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
gpr_mu_unlock(&chand->server->mu_call);