aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-17 14:57:44 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-17 14:57:44 -0700
commit8b282cbd0b1df8ae07b563b1a11eecb8e3bbe0a6 (patch)
tree4d4c653b8f351a0ec3bb332403e6c68b2451a6b8 /src/core/surface/call.c
parent9c1043e757a050b789c21498c9c6984133a04d8e (diff)
Got rid of GRPC_SEND_START
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c36
1 files changed, 27 insertions, 9 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5facaa503d..0d5fe225d9 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -94,6 +94,8 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
@@ -363,6 +365,7 @@ static void request_more_data(grpc_call *call) {
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
}
@@ -660,15 +663,9 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_metadata_batch_link_head(&op.data.metadata,
&call->send_initial_metadata[i]);
}
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
- op.type = GRPC_SEND_START;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.start.pollset = grpc_cq_pollset(call->cq);
op.done_cb = finish_start_step;
op.user_data = call;
+ op.bind_pollset = grpc_cq_pollset(call->cq);
grpc_call_execute_op(call, &op);
break;
case SEND_BUFFERED_MESSAGE:
@@ -682,6 +679,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.data.message = data.send_message;
op.done_cb = finish_write_step;
op.user_data = call;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
case SEND_TRAILING_METADATA_AND_FINISH:
@@ -694,6 +692,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
call, data.send_metadata.count, data.send_metadata.metadata);
op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
op.data.metadata.deadline = call->send_deadline;
+ op.bind_pollset = NULL;
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
@@ -723,6 +722,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.flags = 0;
op.done_cb = finish_finish_step;
op.user_data = call;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
}
@@ -876,6 +876,7 @@ grpc_call_error grpc_call_cancel(grpc_call *c) {
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
+ op.bind_pollset = NULL;
elem = CALL_ELEM_FROM_CALL(c, 0);
elem->filter->call_op(elem, NULL, &op);
@@ -983,6 +984,14 @@ void grpc_call_recv_message(grpc_call_element *elem,
unlock(call);
}
+void grpc_call_recv_synthetic_status(grpc_call_element *elem, grpc_status_code status, const char *message) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ set_status_code(call, STATUS_FROM_CORE, status);
+ set_status_details(call, STATUS_FROM_CORE, grpc_mdstr_from_string(call->metadata_context, message));
+ unlock(call);
+}
+
int grpc_call_recv_metadata(grpc_call_element *elem,
grpc_metadata_batch *md) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
@@ -990,6 +999,7 @@ int grpc_call_recv_metadata(grpc_call_element *elem,
grpc_metadata_array *dest;
grpc_metadata *mdusr;
int is_trailing;
+ grpc_mdctx *mdctx = call->metadata_context;
lock(call);
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
@@ -998,10 +1008,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem,
grpc_mdstr *key = md->key;
if (key == grpc_channel_get_status_string(call->channel)) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
- grpc_mdelem_unref(md);
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
- grpc_mdelem_unref(md);
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1022,6 +1030,7 @@ int grpc_call_recv_metadata(grpc_call_element *elem,
sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
}
call->owned_metadata[call->owned_metadata_count++] = md;
+ l->md = 0;
}
}
if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
@@ -1032,6 +1041,15 @@ int grpc_call_recv_metadata(grpc_call_element *elem,
}
unlock(call);
+ grpc_mdctx_lock(mdctx);
+ for (l = md->list.head; l; l = l->next) {
+ if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ for (l = md->garbage.head; l; l = l->next) {
+ grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ grpc_mdctx_unlock(mdctx);
+
return !is_trailing;
}