aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c82
1 files changed, 56 insertions, 26 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5f489c0f4e..bf6fe73775 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -151,12 +151,17 @@ struct grpc_call {
gpr_uint8 receiving;
/* are we currently completing requests */
gpr_uint8 completing;
+ /** has grpc_call_destroy been called */
+ gpr_uint8 destroy_called;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
/* are we currently reading a message? */
gpr_uint8 reading_message;
/* have we bound a pollset yet? */
gpr_uint8 bound_pollset;
+ /* is an error status set */
+ gpr_uint8 error_status_set;
+
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint16 last_send_contains;
@@ -215,7 +220,7 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- /** Compression algorithm for the call */
+ /* Compression algorithm for the call */
grpc_compression_algorithm compression_algorithm;
/* Contexts for various subsystems (security, tracing, ...). */
@@ -241,6 +246,9 @@ struct grpc_call {
gpr_uint32 incoming_message_length;
gpr_uint32 incoming_message_flags;
grpc_iomgr_closure destroy_closure;
+ grpc_iomgr_closure on_done_recv;
+ grpc_iomgr_closure on_done_send;
+ grpc_iomgr_closure on_done_bind;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -259,6 +267,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description);
+static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
@@ -302,16 +311,18 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message);
- /* dropped in destroy */
- gpr_ref_init(&call->internal_refcount, 1);
+ grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
+ grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
+ grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
+ /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
+ gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata.
TODO(ctiller): figure out a cleaner solution */
if (!call->is_client) {
memset(&initial_op, 0, sizeof(initial_op));
initial_op.recv_ops = &call->recv_ops;
initial_op.recv_state = &call->recv_state;
- initial_op.on_done_recv = call_on_done_recv;
- initial_op.recv_user_data = call;
+ initial_op.on_done_recv = &call->on_done_recv;
initial_op.context = call->context;
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
@@ -410,6 +421,7 @@ static void set_status_code(grpc_call *call, status_source source,
call->status[source].is_set = 1;
call->status[source].code = status;
+ call->error_status_set = status != GRPC_STATUS_OK;
if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
grpc_bbq_flush(&call->incoming_queue);
@@ -451,7 +463,8 @@ static int need_more_data(grpc_call *call) {
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
- (call->cancel_with_status != GRPC_STATUS_OK);
+ (call->cancel_with_status != GRPC_STATUS_OK) ||
+ call->destroy_called;
}
static void unlock(grpc_call *call) {
@@ -470,8 +483,7 @@ static void unlock(grpc_call *call) {
if (!call->receiving && need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
- op.on_done_recv = call_on_done_recv;
- op.recv_user_data = call;
+ op.on_done_recv = &call->on_done_recv;
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;
@@ -687,22 +699,21 @@ static void call_on_done_send(void *pc, int success) {
}
static void finish_message(grpc_call *call) {
- /* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer;
- /* some aliases for readability */
- gpr_slice *slices = call->incoming_message.slices;
- const size_t nslices = call->incoming_message.count;
-
- if (call->compression_algorithm > GRPC_COMPRESS_NONE) {
- byte_buffer = grpc_raw_compressed_byte_buffer_create(
- slices, nslices, call->compression_algorithm);
- } else {
- byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
+ if (call->error_status_set == 0) {
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ grpc_byte_buffer *byte_buffer;
+ /* some aliases for readability */
+ gpr_slice *slices = call->incoming_message.slices;
+ const size_t nslices = call->incoming_message.count;
+ if (call->compression_algorithm > GRPC_COMPRESS_NONE) {
+ byte_buffer = grpc_raw_compressed_byte_buffer_create(
+ slices, nslices, call->compression_algorithm);
+ } else {
+ byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
+ }
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
-
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
-
GPR_ASSERT(call->incoming_message.count == 0);
call->reading_message = 0;
}
@@ -823,6 +834,7 @@ static void call_on_done_recv(void *pc, int success) {
grpc_alarm_cancel(&call->alarm);
call->have_alarm = 0;
}
+ GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
}
finish_read_ops(call);
} else {
@@ -966,8 +978,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
break;
}
if (op->send_ops) {
- op->on_done_send = call_on_done_send;
- op->send_user_data = call;
+ op->on_done_send = &call->on_done_send;
}
return op->send_ops != NULL;
}
@@ -1097,6 +1108,8 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
void grpc_call_destroy(grpc_call *c) {
int cancel;
lock(c);
+ GPR_ASSERT(!c->destroy_called);
+ c->destroy_called = 1;
if (c->have_alarm) {
grpc_alarm_cancel(&c->alarm);
c->have_alarm = 0;
@@ -1141,14 +1154,31 @@ static void finished_loose_op(void *call, int success_ignored) {
GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
}
+typedef struct {
+ grpc_call *call;
+ grpc_iomgr_closure closure;
+} finished_loose_op_allocated_args;
+
+static void finished_loose_op_allocated(void *alloc, int success) {
+ finished_loose_op_allocated_args *args = alloc;
+ finished_loose_op(args->call, success);
+ gpr_free(args);
+}
+
static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);
if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
GRPC_CALL_INTERNAL_REF(call, "loose-op");
- op->on_consumed = finished_loose_op;
- op->on_consumed_user_data = call;
+ if (op->bind_pollset) {
+ op->on_consumed = &call->on_done_bind;
+ } else {
+ finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
+ args->call = call;
+ grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args);
+ op->on_consumed = &args->closure;
+ }
}
elem = CALL_ELEM_FROM_CALL(call, 0);