aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c74
-rw-r--r--src/core/lib/channel/channel_stack.c3
-rw-r--r--src/core/lib/surface/call.c70
3 files changed, 82 insertions, 65 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index da4c7dc7b2..f00570ba74 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1125,8 +1125,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
grpc_transport_stream_op *op = stream_op;
- grpc_chttp2_transport *t = op->handler_private.args[0];
- grpc_chttp2_stream *s = op->handler_private.args[1];
+ grpc_chttp2_stream *s = op->handler_private.extra_arg;
+ grpc_transport_stream_op_payload *op_payload = op->payload;
+ grpc_chttp2_transport *t = s->t;
if (grpc_http_trace) {
char *str = grpc_transport_stream_op_string(op);
@@ -1134,10 +1135,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
op->on_complete);
gpr_free(str);
if (op->send_initial_metadata) {
- log_metadata(op->send_initial_metadata, s->id, t->is_client, true);
+ log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
+ s->id, t->is_client, true);
}
if (op->send_trailing_metadata) {
- log_metadata(op->send_trailing_metadata, s->id, t->is_client, false);
+ log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
+ s->id, t->is_client, false);
}
}
@@ -1152,23 +1155,25 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
- if (op->collect_stats != NULL) {
+ if (op->collect_stats) {
GPR_ASSERT(s->collecting_stats == NULL);
- s->collecting_stats = op->collect_stats;
+ s->collecting_stats = op_payload->collect_stats.collect_stats;
on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
}
- if (op->cancel_error != GRPC_ERROR_NONE) {
- grpc_chttp2_cancel_stream(exec_ctx, t, s, op->cancel_error);
+ if (op->cancel_stream) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s,
+ op_payload->cancel_stream.cancel_error);
}
- if (op->send_initial_metadata != NULL) {
+ if (op->send_initial_metadata) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
- s->send_initial_metadata = op->send_initial_metadata;
+ s->send_initial_metadata =
+ op_payload->send_initial_metadata.send_initial_metadata;
const size_t metadata_size =
- grpc_metadata_batch_size(op->send_initial_metadata);
+ grpc_metadata_batch_size(s->send_initial_metadata);
const size_t metadata_peer_limit =
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
@@ -1188,7 +1193,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
- if (contains_non_ok_status(op->send_initial_metadata)) {
+ if (contains_non_ok_status(s->send_initial_metadata)) {
s->seen_error = true;
}
if (!s->write_closed) {
@@ -1222,7 +1227,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->send_message != NULL) {
+ if (op->send_message) {
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
@@ -1236,14 +1241,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->fetching_send_message == NULL);
uint8_t *frame_hdr =
grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
- uint32_t flags = op->send_message->flags;
+ uint32_t flags = op_payload->send_message.send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
- size_t len = op->send_message->length;
+ size_t len = op_payload->send_message.send_message->length;
frame_hdr[1] = (uint8_t)(len >> 24);
frame_hdr[2] = (uint8_t)(len >> 16);
frame_hdr[3] = (uint8_t)(len >> 8);
frame_hdr[4] = (uint8_t)(len);
- s->fetching_send_message = op->send_message;
+ s->fetching_send_message = op_payload->send_message.send_message;
s->fetched_send_message_length = 0;
s->next_message_end_offset = s->flow_controlled_bytes_written +
(int64_t)s->flow_controlled_buffer.length +
@@ -1260,14 +1265,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->send_trailing_metadata != NULL) {
+ if (op->send_trailing_metadata) {
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
- s->send_trailing_metadata = op->send_trailing_metadata;
+ s->send_trailing_metadata =
+ op_payload->send_trailing_metadata.send_trailing_metadata;
s->write_buffering = false;
const size_t metadata_size =
- grpc_metadata_batch_size(op->send_trailing_metadata);
+ grpc_metadata_batch_size(s->send_trailing_metadata);
const size_t metadata_peer_limit =
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
@@ -1283,14 +1289,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
- if (contains_non_ok_status(op->send_trailing_metadata)) {
+ if (contains_non_ok_status(s->send_trailing_metadata)) {
s->seen_error = true;
}
if (s->write_closed) {
s->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_trailing_metadata_finished,
- grpc_metadata_batch_is_empty(op->send_trailing_metadata)
+ grpc_metadata_batch_is_empty(s->send_trailing_metadata)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
"stream was closed"),
@@ -1305,17 +1311,19 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->recv_initial_metadata != NULL) {
+ if (op->recv_initial_metadata) {
GPR_ASSERT(s->recv_initial_metadata_ready == NULL);
- s->recv_initial_metadata_ready = op->recv_initial_metadata_ready;
- s->recv_initial_metadata = op->recv_initial_metadata;
+ s->recv_initial_metadata_ready =
+ op_payload->recv_initial_metadata.recv_initial_metadata_ready;
+ s->recv_initial_metadata =
+ op_payload->recv_initial_metadata.recv_initial_metadata;
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
}
- if (op->recv_message != NULL) {
+ if (op->recv_message) {
GPR_ASSERT(s->recv_message_ready == NULL);
- s->recv_message_ready = op->recv_message_ready;
- s->recv_message = op->recv_message;
+ s->recv_message_ready = op_payload->recv_message.recv_message_ready;
+ s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0 &&
(s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
@@ -1323,10 +1331,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
- if (op->recv_trailing_metadata != NULL) {
+ if (op->recv_trailing_metadata) {
GPR_ASSERT(s->recv_trailing_metadata_finished == NULL);
s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
- s->recv_trailing_metadata = op->recv_trailing_metadata;
+ s->recv_trailing_metadata =
+ op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
@@ -1350,8 +1359,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free(str);
}
- op->handler_private.args[0] = gt;
- op->handler_private.args[1] = gs;
+ op->handler_private.extra_arg = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
grpc_closure_sched(
exec_ctx,
@@ -1421,7 +1429,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
grpc_transport_op *op = stream_op;
- grpc_chttp2_transport *t = op->transport_private.args[0];
+ grpc_chttp2_transport *t = op->transport_private.extra_arg;
grpc_error *close_transport = op->disconnect_with_error;
if (op->on_connectivity_state_change != NULL) {
@@ -1467,7 +1475,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
char *msg = grpc_transport_op_string(op);
gpr_free(msg);
- op->transport_private.args[0] = gt;
+ op->transport_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
grpc_closure_sched(
exec_ctx, grpc_closure_init(&op->transport_private.closure,
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 3fb2a60ac7..e92e46a9b1 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -293,6 +293,7 @@ void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_error *error) {
grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL);
- op->cancel_error = error;
+ op->cancel_stream = true;
+ op->payload->cancel_stream.cancel_error = error;
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 1a8dd245d5..2f212686bc 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1137,8 +1137,9 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
- grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
- finish_batch_completion, bctl, &bctl->completion_data.cq_completion);
+ grpc_cq_end_op(
+ exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
+ finish_batch_completion, bctl, &bctl->completion_data.cq_completion);
}
}
@@ -1389,10 +1390,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
}
bctl->completion_data.notify_tag.tag = notify_tag;
- bctl->completion_data.notify_tag.is_closure = (uint8_t)(is_notify_tag_closure != 0);
+ bctl->completion_data.notify_tag.is_closure =
+ (uint8_t)(is_notify_tag_closure != 0);
gpr_mu_lock(&call->mu);
grpc_transport_stream_op *stream_op = &bctl->op;
+ grpc_transport_stream_op_payload *stream_op_payload =
+ &call->stream_op_payload;
memset(stream_op, 0, sizeof(*stream_op));
stream_op->covered_by_poller = true;
@@ -1447,7 +1451,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- bctl->op.send_initial_metadata = true;
+ stream_op->send_initial_metadata = true;
call->sent_initial_metadata = true;
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
@@ -1458,9 +1462,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
/* TODO(ctiller): just make these the same variable? */
call->metadata_batch[0][0].deadline = call->send_deadline;
- call->stream_op_payload.send_initial_metadata.send_initial_metadata =
+ stream_op_payload->send_initial_metadata.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- call->stream_op_payload.send_initial_metadata.send_initial_metadata_flags = op->flags;
+ stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
+ op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1475,7 +1480,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- bctl->op.send_message = true;
+ stream_op->send_message = true;
call->sending_message = true;
grpc_slice_buffer_stream_init(
&call->sending_stream,
@@ -1488,7 +1493,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_COMPRESS_NONE) {
call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- call->stream_op_payload.send_message.send_message = &call->sending_stream.base;
+ stream_op_payload->send_message.send_message =
+ &call->sending_stream.base;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
@@ -1504,7 +1510,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- bctl->send_final_op = 1;
+ stream_op->send_trailing_metadata = true;
call->sent_final_op = 1;
stream_op->send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
@@ -1528,7 +1534,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- bctl->send_final_op = 1;
+ stream_op->send_trailing_metadata = true;
call->sent_final_op = 1;
GPR_ASSERT(call->send_extra_metadata_count == 0);
call->send_extra_metadata_count = 1;
@@ -1566,7 +1572,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- stream_op->send_trailing_metadata =
+ stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1589,10 +1595,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
- bctl->recv_initial_metadata = 1;
- stream_op->recv_initial_metadata =
+ stream_op->recv_initial_metadata = true;
+ stream_op_payload->recv_initial_metadata.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- stream_op->recv_initial_metadata_ready =
+ stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
num_completion_callbacks_needed++;
break;
@@ -1606,13 +1612,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->receiving_message = 1;
- bctl->recv_message = 1;
+ call->receiving_message = true;
+ stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
- stream_op->recv_message = &call->receiving_stream;
+ stream_op_payload->recv_message.recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
bctl, grpc_schedule_on_exec_ctx);
- stream_op->recv_message_ready = &call->receiving_stream_ready;
+ stream_op_payload->recv_message.recv_message_ready =
+ &call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
@@ -1635,10 +1642,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.client.status = op->data.recv_status_on_client.status;
call->final_op.client.status_details =
op->data.recv_status_on_client.status_details;
- bctl->recv_final_op = 1;
- stream_op->recv_trailing_metadata =
+ stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op->collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
@@ -1658,10 +1666,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->requested_final_op = 1;
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
- bctl->recv_final_op = 1;
- stream_op->recv_trailing_metadata =
+ stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op->collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
}
@@ -1673,7 +1682,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
- stream_op->context = call->context;
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
@@ -1687,25 +1695,25 @@ done:
done_with_error:
/* reverse any mutations that occured */
- if (bctl->send_initial_metadata) {
+ if (stream_op->send_initial_metadata) {
call->sent_initial_metadata = 0;
grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]);
}
- if (bctl->send_message) {
+ if (stream_op->send_message) {
call->sending_message = 0;
grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
}
- if (bctl->send_final_op) {
+ if (stream_op->send_trailing_metadata) {
call->sent_final_op = 0;
grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]);
}
- if (bctl->recv_initial_metadata) {
+ if (stream_op->recv_initial_metadata) {
call->received_initial_metadata = 0;
}
- if (bctl->recv_message) {
+ if (stream_op->recv_message) {
call->receiving_message = 0;
}
- if (bctl->recv_final_op) {
+ if (stream_op->recv_trailing_metadata) {
call->requested_final_op = 0;
}
gpr_mu_unlock(&call->mu);