diff options
Diffstat (limited to 'src/core')
35 files changed, 723 insertions, 535 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index fc29dbd454..bcf59a4efe 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -74,17 +74,18 @@ static void extract_and_annotate_method_tag(grpc_metadata_batch *md, } static void client_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (op->send_initial_metadata) { - extract_and_annotate_method_tag(op->send_initial_metadata, calld, chand); + extract_and_annotate_method_tag( + op->payload->send_initial_metadata.send_initial_metadata, calld, chand); } } static void client_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { client_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); } @@ -103,19 +104,22 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, } static void server_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; if (op->recv_initial_metadata) { /* substitute our callback for the op callback */ - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->finish_recv; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->on_done_recv = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->finish_recv; } } static void server_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { /* TODO(ctiller): this code fails. I don't know why. I expect it's incomplete, and someone should look at it soon. diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 435a3ab0fe..477d0e792b 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -374,8 +374,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // resolver actually specified. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != NULL) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) { grpc_lb_addresses *addresses = channel_arg->value.pointer.p; bool found_backend_address = false; for (size_t i = 0; i < addresses->num_addresses; ++i) { @@ -533,7 +532,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { grpc_transport_op *op = arg; - grpc_channel_element *elem = op->transport_private.args[0]; + grpc_channel_element *elem = op->handler_private.extra_arg; channel_data *chand = elem->channel_data; if (op->on_connectivity_state_change != NULL) { @@ -595,12 +594,12 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->bind_pollset); } - op->transport_private.args[0] = elem; + op->handler_private.extra_arg = elem; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); grpc_closure_sched( - exec_ctx, grpc_closure_init( - &op->transport_private.closure, start_transport_op_locked, - op, grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, + grpc_closure_init(&op->handler_private.closure, start_transport_op_locked, + op, grpc_combiner_scheduler(chand->combiner, false)), GRPC_ERROR_NONE); } @@ -643,14 +642,26 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, // Record client channel factory. const grpc_arg *arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); - GPR_ASSERT(arg != NULL); - GPR_ASSERT(arg->type == GRPC_ARG_POINTER); + if (arg == NULL) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Missing client channel factory in args for client channel filter"); + } + if (arg->type != GRPC_ARG_POINTER) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "client channel factory arg must be a pointer"); + } grpc_client_channel_factory_ref(arg->value.pointer.p); chand->client_channel_factory = arg->value.pointer.p; // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(arg != NULL); - GPR_ASSERT(arg->type == GRPC_ARG_STRING); + if (arg == NULL) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Missing server uri in args for client channel filter"); + } + if (arg->type != GRPC_ARG_STRING) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "server uri arg must be a string"); + } char *proxy_name = NULL; grpc_channel_args *new_args = NULL; grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args, @@ -755,7 +766,7 @@ typedef struct client_channel_call_data { grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; - grpc_transport_stream_op **waiting_ops; + grpc_transport_stream_op_batch **waiting_ops; size_t waiting_ops_count; size_t waiting_ops_capacity; @@ -775,7 +786,8 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call( return scc == CANCELLED_CALL ? NULL : scc; } -static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { +static void add_waiting_locked(call_data *calld, + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("add_waiting_locked", 0); if (calld->waiting_ops_count == calld->waiting_ops_capacity) { calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity); @@ -791,7 +803,7 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, grpc_error *error) { size_t i; for (i = 0; i < calld->waiting_ops_count; i++) { - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error)); } calld->waiting_ops_count = 0; @@ -804,7 +816,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { } grpc_subchannel_call *call = GET_CALL(calld); - grpc_transport_stream_op **ops = calld->waiting_ops; + grpc_transport_stream_op_batch **ops = calld->waiting_ops; size_t nops = calld->waiting_ops_count; if (call == CANCELLED_CALL) { fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); @@ -1052,9 +1064,9 @@ static bool pick_subchannel_locked( return false; } -static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_call_element *elem) { +static void start_transport_stream_op_batch_locked_inner( + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_subchannel_call *call; @@ -1062,7 +1074,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, /* need to recheck that another thread hasn't set the call */ call = GET_CALL(calld); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); /* early out */ return; @@ -1073,11 +1085,11 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, return; } /* if this is a cancellation, then we can raise our cancelled flag */ - if (op->cancel_error != GRPC_ERROR_NONE) { + if (op->cancel_stream) { if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, (gpr_atm)(uintptr_t)CANCELLED_CALL)) { /* recurse to retry */ - start_transport_stream_op_locked_inner(exec_ctx, op, elem); + start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); /* early out */ return; } else { @@ -1086,27 +1098,29 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, cancelled before any ops are passed down (e.g., if the deadline is in the past when the call starts), we can return the right error to the caller when the first op does get passed down. */ - calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); + calld->cancel_error = + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: - fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); + fail_locked(exec_ctx, calld, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel_locked(exec_ctx, elem, NULL, 0, - &calld->connected_subchannel, NULL, - GRPC_ERROR_REF(op->cancel_error)); + pick_subchannel_locked( + exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); break; } - grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, op, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); /* early out */ return; } } /* if we don't have a subchannel, try to get one */ if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - calld->connected_subchannel == NULL && - op->send_initial_metadata != NULL) { + calld->connected_subchannel == NULL && op->send_initial_metadata) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, grpc_combiner_scheduler(chand->combiner, true)); @@ -1114,10 +1128,11 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ - if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata, - op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, - GRPC_ERROR_NONE)) { + if (pick_subchannel_locked( + exec_ctx, elem, + op->payload->send_initial_metadata.send_initial_metadata, + op->payload->send_initial_metadata.send_initial_metadata_flags, + &calld->connected_subchannel, &calld->next_step, GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -1140,13 +1155,13 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); } gpr_atm_rel_store(&calld->subchannel_call, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); /* recurse to retry */ - start_transport_stream_op_locked_inner(exec_ctx, op, elem); + start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); /* early out */ return; } @@ -1174,15 +1189,16 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GRPC_ERROR_REF(error)); } -static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); +static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); - grpc_transport_stream_op *op = arg; - grpc_call_element *elem = op->handler_private.args[0]; + grpc_transport_stream_op_batch *op = arg; + grpc_call_element *elem = op->handler_private.extra_arg; call_data *calld = elem->call_data; - if (op->recv_trailing_metadata != NULL) { + if (op->recv_trailing_metadata) { GPR_ASSERT(op->on_complete != NULL); calld->original_on_complete = op->on_complete; grpc_closure_init(&calld->on_complete, on_complete, elem, @@ -1190,11 +1206,11 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, op->on_complete = &calld->on_complete; } - start_transport_stream_op_locked_inner(exec_ctx, op, elem); + start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, - "start_transport_stream_op"); - GPR_TIMER_END("start_transport_stream_op_locked", 0); + "start_transport_stream_op_batch"); + GPR_TIMER_END("start_transport_stream_op_batch_locked", 0); } /* The logic here is fairly complicated, due to (a) the fact that we @@ -1205,39 +1221,40 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, We use double-checked locking to initially see if initialization has been performed. If it has not, we acquire the combiner and perform initialization. If it has, we proceed on the fast path. */ -static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void cc_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); + grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, + op); /* try to (atomically) get the call */ grpc_subchannel_call *call = GET_CALL(calld); - GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); + GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; } if (call != NULL) { grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; } /* we failed; lock and figure out what to do */ - GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op"); - op->handler_private.args[0] = elem; + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); + op->handler_private.extra_arg = elem; grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, - start_transport_stream_op_locked, op, + start_transport_stream_op_batch_locked, op, grpc_combiner_scheduler(chand->combiner, false)), GRPC_ERROR_NONE); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -1296,7 +1313,7 @@ static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, */ const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, + cc_start_transport_stream_op_batch, cc_start_transport_op, sizeof(call_data), cc_init_call_elem, diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 063c0badff..3a2cb00ca7 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -360,7 +360,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, for (size_t i = 0; i < c->args->num_args; i++) { if (0 == strcmp(c->args->args[i].key, "grpc.testing.fixed_reconnect_backoff_ms")) { - GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); fixed_reconnect_backoff = true; initial_backoff_ms = min_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer( @@ -749,11 +748,11 @@ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0); grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op); + top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, op); GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 3e64a2507c..ba96c92df8 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -157,7 +157,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); /** continue querying for peer */ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 601b0e643b..629d81953c 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -847,7 +847,9 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * this is the right LB policy to use. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_grpclb_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index fc65dfdcb9..8c73ea6014 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -402,7 +402,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, * addresses, since we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index a62082a2ff..8665b5c6c8 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -691,7 +691,9 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, * addresses, since we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index ea57c85c3a..2e7b3580c3 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -183,25 +183,28 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, */ } -static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - GPR_TIMER_BEGIN("lr_start_transport_stream_op", 0); +static void lr_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + GPR_TIMER_BEGIN("lr_start_transport_stream_op_batch", 0); call_data *calld = elem->call_data; if (op->recv_initial_metadata) { - calld->recv_initial_metadata = op->recv_initial_metadata; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; /* substitute our callback for the higher callback */ - calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->on_initial_md_ready; + calld->ops_recv_initial_metadata_ready = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->on_initial_md_ready; } grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("lr_start_transport_stream_op", 0); + GPR_TIMER_END("lr_start_transport_stream_op_batch", 0); } const grpc_channel_filter grpc_load_reporting_filter = { - lr_start_transport_stream_op, + lr_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 73f9454f7a..a8e320d037 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1102,7 +1102,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, &s->fetching_slice, UINT32_MAX, - &s->complete_fetch)) { + &s->complete_fetch_locked)) { add_fetched_slice_locked(exec_ctx, t, s); } } @@ -1140,20 +1140,23 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); - grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->handler_private.args[0]; - grpc_chttp2_stream *s = op->handler_private.args[1]; + grpc_transport_stream_op_batch *op = stream_op; + grpc_chttp2_stream *s = op->handler_private.extra_arg; + grpc_transport_stream_op_batch_payload *op_payload = op->payload; + grpc_chttp2_transport *t = s->t; if (grpc_http_trace) { - char *str = grpc_transport_stream_op_string(op); + char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); gpr_free(str); if (op->send_initial_metadata) { - log_metadata(op->send_initial_metadata, s->id, t->is_client, true); + log_metadata(op_payload->send_initial_metadata.send_initial_metadata, + s->id, t->is_client, true); } if (op->send_trailing_metadata) { - log_metadata(op->send_trailing_metadata, s->id, t->is_client, false); + log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata, + s->id, t->is_client, false); } } @@ -1168,23 +1171,25 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; on_complete->error_data.error = GRPC_ERROR_NONE; - if (op->collect_stats != NULL) { + if (op->collect_stats) { GPR_ASSERT(s->collecting_stats == NULL); - s->collecting_stats = op->collect_stats; + s->collecting_stats = op_payload->collect_stats.collect_stats; on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT; } - if (op->cancel_error != GRPC_ERROR_NONE) { - grpc_chttp2_cancel_stream(exec_ctx, t, s, op->cancel_error); + if (op->cancel_stream) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, + op_payload->cancel_stream.cancel_error); } - if (op->send_initial_metadata != NULL) { + if (op->send_initial_metadata) { GPR_ASSERT(s->send_initial_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_initial_metadata_finished = add_closure_barrier(on_complete); - s->send_initial_metadata = op->send_initial_metadata; + s->send_initial_metadata = + op_payload->send_initial_metadata.send_initial_metadata; const size_t metadata_size = - grpc_metadata_batch_size(op->send_initial_metadata); + grpc_metadata_batch_size(s->send_initial_metadata); const size_t metadata_peer_limit = t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; @@ -1205,7 +1210,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); } else { - if (contains_non_ok_status(op->send_initial_metadata)) { + if (contains_non_ok_status(s->send_initial_metadata)) { s->seen_error = true; } if (!s->write_closed) { @@ -1225,8 +1230,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_ASSERT(s->id != 0); grpc_chttp2_stream_write_type write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; - if (op->send_message != NULL && - (op->send_message->flags & GRPC_WRITE_BUFFER_HINT)) { + if (op->send_message && + (op->payload->send_message.send_message->flags & + GRPC_WRITE_BUFFER_HINT)) { write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; } grpc_chttp2_become_writable(exec_ctx, t, s, write_type, @@ -1244,7 +1250,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } - if (op->send_message != NULL) { + if (op->send_message) { on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { @@ -1258,14 +1264,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_ASSERT(s->fetching_send_message == NULL); uint8_t *frame_hdr = grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); - uint32_t flags = op->send_message->flags; + uint32_t flags = op_payload->send_message.send_message->flags; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; - size_t len = op->send_message->length; + size_t len = op_payload->send_message.send_message->length; frame_hdr[1] = (uint8_t)(len >> 24); frame_hdr[2] = (uint8_t)(len >> 16); frame_hdr[3] = (uint8_t)(len >> 8); frame_hdr[4] = (uint8_t)(len); - s->fetching_send_message = op->send_message; + s->fetching_send_message = op_payload->send_message.send_message; s->fetched_send_message_length = 0; s->next_message_end_offset = s->flow_controlled_bytes_written + (int64_t)s->flow_controlled_buffer.length + @@ -1282,14 +1288,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } - if (op->send_trailing_metadata != NULL) { + if (op->send_trailing_metadata) { GPR_ASSERT(s->send_trailing_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); - s->send_trailing_metadata = op->send_trailing_metadata; + s->send_trailing_metadata = + op_payload->send_trailing_metadata.send_trailing_metadata; s->write_buffering = false; const size_t metadata_size = - grpc_metadata_batch_size(op->send_trailing_metadata); + grpc_metadata_batch_size(s->send_trailing_metadata); const size_t metadata_peer_limit = t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; @@ -1306,14 +1313,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); } else { - if (contains_non_ok_status(op->send_trailing_metadata)) { + if (contains_non_ok_status(s->send_trailing_metadata)) { s->seen_error = true; } if (s->write_closed) { s->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_trailing_metadata_finished, - grpc_metadata_batch_is_empty(op->send_trailing_metadata) + grpc_metadata_batch_is_empty( + op->payload->send_trailing_metadata.send_trailing_metadata) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Attempt to send trailing metadata after " @@ -1329,17 +1337,19 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } - if (op->recv_initial_metadata != NULL) { + if (op->recv_initial_metadata) { GPR_ASSERT(s->recv_initial_metadata_ready == NULL); - s->recv_initial_metadata_ready = op->recv_initial_metadata_ready; - s->recv_initial_metadata = op->recv_initial_metadata; + s->recv_initial_metadata_ready = + op_payload->recv_initial_metadata.recv_initial_metadata_ready; + s->recv_initial_metadata = + op_payload->recv_initial_metadata.recv_initial_metadata; grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); } - if (op->recv_message != NULL) { + if (op->recv_message) { GPR_ASSERT(s->recv_message_ready == NULL); - s->recv_message_ready = op->recv_message_ready; - s->recv_message = op->recv_message; + s->recv_message_ready = op_payload->recv_message.recv_message_ready; + s->recv_message = op_payload->recv_message.recv_message; if (s->id != 0 && (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) { incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); @@ -1347,10 +1357,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } - if (op->recv_trailing_metadata != NULL) { + if (op->recv_trailing_metadata) { GPR_ASSERT(s->recv_trailing_metadata_finished == NULL); s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); - s->recv_trailing_metadata = op->recv_trailing_metadata; + s->recv_trailing_metadata = + op_payload->recv_trailing_metadata.recv_trailing_metadata; s->final_metadata_requested = true; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } @@ -1363,19 +1374,19 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_stream *gs, + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("perform_stream_op", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; if (grpc_http_trace) { - char *str = grpc_transport_stream_op_string(op); + char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op[s=%p/%d]: %s", s, s->id, str); gpr_free(str); } - op->handler_private.args[0] = gt; - op->handler_private.args[1] = gs; + op->handler_private.extra_arg = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, @@ -1452,7 +1463,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { grpc_transport_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; + grpc_chttp2_transport *t = op->handler_private.extra_arg; grpc_error *close_transport = op->disconnect_with_error; if (op->on_connectivity_state_change != NULL) { @@ -1498,10 +1509,10 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; char *msg = grpc_transport_op_string(op); gpr_free(msg); - op->transport_private.args[0] = gt; + op->handler_private.extra_arg = gt; GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); grpc_closure_sched( - exec_ctx, grpc_closure_init(&op->transport_private.closure, + exec_ctx, grpc_closure_init(&op->handler_private.closure, perform_transport_op_locked, op, grpc_combiner_scheduler(t->combiner, false)), GRPC_ERROR_NONE); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8b718e963c..50993e4ecd 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -452,7 +452,6 @@ struct grpc_chttp2_stream { int64_t next_message_end_offset; int64_t flow_controlled_bytes_written; bool complete_fetch_covered_by_poller; - grpc_closure complete_fetch; grpc_closure complete_fetch_locked; grpc_closure *fetching_send_message_finished; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 952690eba7..9bd8914b98 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -172,7 +172,7 @@ struct op_state { }; struct op_and_state { - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; struct op_state state; bool done; struct stream_obj *s; /* Pointer back to the stream object */ @@ -187,7 +187,7 @@ struct op_storage { struct stream_obj { gpr_arena *arena; struct op_and_state *oas; - grpc_transport_stream_op *curr_op; + grpc_transport_stream_op_batch *curr_op; grpc_cronet_transport *curr_ct; grpc_stream *curr_gs; bidirectional_stream *cbs; @@ -298,12 +298,13 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) { /* Add a new stream op to op storage. */ -static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { +static void add_to_storage(struct stream_obj *s, + grpc_transport_stream_op_batch *op) { struct op_storage *storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state)); - memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op)); + memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch)); memset(&new_op->state, 0, sizeof(new_op->state)); new_op->s = s; new_op->done = false; @@ -768,7 +769,7 @@ static bool header_has_authority(grpc_linked_mdelem *head) { Op Execution: Decide if one of the actions contained in the stream op can be executed. This is the heart of the state machine. */ -static bool op_can_be_run(grpc_transport_stream_op *curr_op, +static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, struct stream_obj *s, struct op_state *op_state, enum e_op_id op_id) { struct op_state *stream_state = &s->state; @@ -919,7 +920,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, */ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas) { - grpc_transport_stream_op *stream_op = &oas->op; + grpc_transport_stream_op_batch *stream_op = &oas->op; struct stream_obj *s = oas->s; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; struct op_state *stream_state = &s->state; @@ -941,9 +942,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; - convert_metadata_to_cronet_headers( - stream_op->send_initial_metadata->list.head, t->host, &url, - &s->header_array.headers, &s->header_array.count, &method); + convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata + .send_initial_metadata->list.head, + t->host, &url, &s->header_array.headers, + &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); @@ -971,8 +973,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer write_slice_buffer; grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); - grpc_byte_stream_next(NULL, stream_op->send_message, &slice, - stream_op->send_message->length, NULL); + grpc_byte_stream_next( + NULL, stream_op->payload->send_message.send_message, &slice, + stream_op->payload->send_message.send_message->length, NULL); grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ @@ -982,7 +985,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (write_slice_buffer.count > 0) { size_t write_buffer_size; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size, stream_op->send_message->flags); + &write_buffer_size, + stream_op->payload->send_message.send_message->flags); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; @@ -1029,20 +1033,28 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, - stream_op->recv_initial_metadata); - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + stream_op->payload->recv_initial_metadata.recv_initial_metadata); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -1051,27 +1063,31 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->flush_read) { CRONET_LOG(GPR_DEBUG, "flush read"); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1110,8 +1126,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, stream_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1163,7 +1180,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1187,12 +1205,12 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.trailing_metadata, - stream_op->recv_trailing_metadata); + stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_state->rs.trailing_metadata_valid = false; } stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; - } else if (stream_op->cancel_error && + } else if (stream_op->cancel_stream && op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); @@ -1204,7 +1222,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } stream_state->state_op_done[OP_CANCEL_ERROR] = true; if (!stream_state->cancel_error) { - stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); + stream_state->cancel_error = + GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error); } } else if (stream_op->on_complete && op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { @@ -1283,18 +1302,22 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set) {} static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_stream *gs, + grpc_transport_stream_op_batch *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); if (op->send_initial_metadata && - header_has_authority(op->send_initial_metadata->list.head)) { + header_has_authority(op->payload->send_initial_metadata + .send_initial_metadata->list.head)) { /* Cronet does not support :authority header field. We cancel the call when this field is present in metadata */ - if (op->recv_initial_metadata_ready) { - grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED); + if (op->recv_initial_metadata) { + grpc_closure_sched( + exec_ctx, + op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); } - if (op->recv_message_ready) { - grpc_closure_sched(exec_ctx, op->recv_message_ready, + if (op->recv_message) { + grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, GRPC_ERROR_CANCELLED); } grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 6d53b0576e..94382980eb 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -246,9 +246,9 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, } void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { grpc_call_element *next_elem = elem + 1; - next_elem->filter->start_transport_stream_op(exec_ctx, next_elem, op); + next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op); } char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, @@ -284,7 +284,8 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_error *error) { - grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL); - op->cancel_error = error; - elem->filter->start_transport_stream_op(exec_ctx, elem, op); + grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL); + op->cancel_stream = true; + op->payload->cancel_stream.cancel_error = error; + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); } diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 80e3603e8d..fdbcbdb018 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -112,9 +112,9 @@ typedef struct { typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op); + void (*start_transport_stream_op_batch)(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ @@ -281,7 +281,7 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set( grpc_polling_entity *pollent); /* Call the next operation in a call stack */ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); /* Call the next operation (depending on call directionality) in a channel stack */ void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -300,7 +300,8 @@ grpc_channel_stack *grpc_channel_stack_from_top_element( grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_transport_stream_op *op); + grpc_call_element *elem, + grpc_transport_stream_op_batch *op); void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem, diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 02dc479f3a..4625cba0d2 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -62,7 +62,7 @@ typedef struct call_data { /** If true, contents of \a compression_algorithm are authoritative */ int has_compression_algorithm; - grpc_transport_stream_op *send_op; + grpc_transport_stream_op_batch *send_op; uint32_t send_length; uint32_t send_flags; grpc_slice incoming_slice; @@ -210,7 +210,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); - calld->send_op->send_message = &calld->replacement_stream.base; + calld->send_op->payload->send_message.send_message = + &calld->replacement_stream.base; calld->post_send = calld->send_op->on_complete; calld->send_op->on_complete = &calld->send_done; @@ -231,9 +232,9 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; - while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message, - &calld->incoming_slice, ~(size_t)0, - &calld->got_slice)) { + while (grpc_byte_stream_next( + exec_ctx, calld->send_op->payload->send_message.send_message, + &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); @@ -242,33 +243,34 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, } } -static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void compress_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; - GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0); + GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); if (op->send_initial_metadata) { grpc_error *error = process_send_initial_metadata( - exec_ctx, elem, op->send_initial_metadata); + exec_ctx, elem, + op->payload->send_initial_metadata.send_initial_metadata); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; } } - if (op->send_message != NULL && - !skip_compression(elem, op->send_message->flags)) { + if (op->send_message && + !skip_compression(elem, op->payload->send_message.send_message->flags)) { calld->send_op = op; - calld->send_length = op->send_message->length; - calld->send_flags = op->send_message->flags; + calld->send_length = op->payload->send_message.send_message->length; + calld->send_flags = op->payload->send_message.send_message->flags; continue_send_message(exec_ctx, elem); } else { /* pass control down the stack */ grpc_call_next_op(exec_ctx, elem, op); } - GPR_TIMER_END("compress_start_transport_stream_op", 0); + GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -337,7 +339,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_compress_filter = { - compress_start_transport_stream_op, + compress_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 75c68a5534..22caf24373 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -62,9 +62,9 @@ typedef struct connected_channel_call_data { void *unused; } call_data; /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void con_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -142,7 +142,7 @@ static void con_get_channel_info(grpc_exec_ctx *exec_ctx, const grpc_channel_info *channel_info) {} const grpc_channel_filter grpc_connected_filter = { - con_start_transport_stream_op, + con_start_transport_stream_op_batch, con_start_transport_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index ca701ed457..fda099b021 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -134,7 +134,7 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { // Inject our own on_complete callback into op. static void inject_on_complete_cb(grpc_deadline_state* deadline_state, - grpc_transport_stream_op* op) { + grpc_transport_stream_op_batch* op) { deadline_state->next_on_complete = op->on_complete; grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state, grpc_schedule_on_exec_ctx); @@ -196,16 +196,16 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, start_timer_if_needed(exec_ctx, elem, new_deadline); } -void grpc_deadline_state_client_start_transport_stream_op( +void grpc_deadline_state_client_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op* op) { + grpc_transport_stream_op_batch* op) { grpc_deadline_state* deadline_state = elem->call_data; - if (op->cancel_error != GRPC_ERROR_NONE) { + if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, deadline_state); } else { // Make sure we know when the call is complete, so that we can cancel // the timer. - if (op->recv_trailing_metadata != NULL) { + if (op->recv_trailing_metadata) { inject_on_complete_cb(deadline_state, op); } } @@ -261,10 +261,11 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, } // Method for starting a call op for client filter. -static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_transport_stream_op* op) { - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); +static void client_start_transport_stream_op_batch( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { + grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, + op); // Chain to next filter. grpc_call_next_op(exec_ctx, elem, op); } @@ -282,30 +283,33 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, } // Method for starting a call op for server filter. -static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_transport_stream_op* op) { +static void server_start_transport_stream_op_batch( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { server_call_data* calld = elem->call_data; - if (op->cancel_error != GRPC_ERROR_NONE) { + if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); } else { // If we're receiving initial metadata, we need to get the deadline // from the recv_initial_metadata_ready callback. So we inject our // own callback into that hook. - if (op->recv_initial_metadata_ready != NULL) { - calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready; - calld->recv_initial_metadata = op->recv_initial_metadata; + if (op->recv_initial_metadata) { + calld->next_recv_initial_metadata_ready = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; grpc_closure_init(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; } // Make sure we know when the call is complete, so that we can cancel // the timer. // Note that we trigger this on recv_trailing_metadata, even though // the client never sends trailing metadata, because this is the // hook that tells us when the call is complete on the server side. - if (op->recv_trailing_metadata != NULL) { + if (op->recv_trailing_metadata) { inject_on_complete_cb(&calld->base.deadline_state, op); } } @@ -314,7 +318,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, } const grpc_channel_filter grpc_client_deadline_filter = { - client_start_transport_stream_op, + client_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(base_call_data), init_call_elem, @@ -329,7 +333,7 @@ const grpc_channel_filter grpc_client_deadline_filter = { }; const grpc_channel_filter grpc_server_deadline_filter = { - server_start_transport_stream_op, + server_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(server_call_data), init_call_elem, diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 72cd5cb929..d8db9a9f97 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -83,15 +83,15 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline); -// To be called from the client-side filter's start_transport_stream_op() +// To be called from the client-side filter's start_transport_stream_op_batch() // method. Ensures that the deadline timer is cancelled when the call // is completed. // // Note: It is the caller's responsibility to chain to the next filter if // necessary after this function returns. -void grpc_deadline_state_client_start_transport_stream_op( +void grpc_deadline_state_client_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op* op); + grpc_transport_stream_op_batch* op); // Deadline filters for direct client channels and server channels. // Note: Deadlines for non-direct client channels are handled by the diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 967904df1e..4e47c5c658 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -36,7 +36,7 @@ #include <grpc/support/string_util.h> #include <string.h> #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/security/util/b64.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -63,7 +63,7 @@ typedef struct call_data { uint8_t *payload_bytes; /* Vars to read data off of send_message */ - grpc_transport_stream_op send_op; + grpc_transport_stream_op_batch *send_op; uint32_t send_length; uint32_t send_flags; grpc_slice incoming_slice; @@ -219,9 +219,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; uint8_t *wrptr = calld->payload_bytes; - while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, - &calld->incoming_slice, ~(size_t)0, - &calld->got_slice)) { + while (grpc_byte_stream_next( + exec_ctx, calld->send_op->payload->send_message.send_message, + &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), GRPC_SLICE_LENGTH(calld->incoming_slice)); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); @@ -242,10 +242,11 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { /* Pass down the original send_message op that was blocked.*/ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); - calld->send_op.send_message = &calld->replacement_stream.base; - calld->post_send = calld->send_op.on_complete; - calld->send_op.on_complete = &calld->send_done; - grpc_call_next_op(exec_ctx, elem, &calld->send_op); + calld->send_op->payload->send_message.send_message = + &calld->replacement_stream.base; + calld->post_send = calld->send_op->on_complete; + calld->send_op->on_complete = &calld->send_done; + grpc_call_next_op(exec_ctx, elem, calld->send_op); } else { continue_send_message(exec_ctx, elem); } @@ -253,29 +254,30 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; grpc_error *error; - if (op->send_initial_metadata != NULL) { + if (op->send_initial_metadata) { /* Decide which HTTP VERB to use. We use GET if the request is marked cacheable, and the operation contains both initial metadata and send message, and the payload is below the size threshold, and all the data for this request is immediately available. */ grpc_mdelem method = GRPC_MDELEM_METHOD_POST; - if ((op->send_initial_metadata_flags & + if (op->send_message && + (op->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && - op->send_message != NULL && - op->send_message->length < channeld->max_payload_size_for_get) { + op->payload->send_message.send_message->length < + channeld->max_payload_size_for_get) { method = GRPC_MDELEM_METHOD_GET; /* The following write to calld->send_message_blocked isn't racy with reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because being here means ops->send_message is not NULL, which is primarily guarding the read there. */ calld->send_message_blocked = true; - } else if (op->send_initial_metadata_flags & + } else if (op->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { method = GRPC_MDELEM_METHOD_PUT; } @@ -283,12 +285,13 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, /* Attempt to read the data from send_message and create a header field. */ if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) { /* allocate memory to hold the entire payload */ - calld->payload_bytes = gpr_malloc(op->send_message->length); + calld->payload_bytes = + gpr_malloc(op->payload->send_message.send_message->length); /* read slices of send_message and copy into payload_bytes */ - calld->send_op = *op; - calld->send_length = op->send_message->length; - calld->send_flags = op->send_message->flags; + calld->send_op = op; + calld->send_length = op->payload->send_message.send_message->length; + calld->send_flags = op->payload->send_message.send_message->flags; continue_send_message(exec_ctx, elem); if (calld->send_message_blocked == false) { @@ -299,13 +302,15 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, const unsigned char k_query_separator = '?'; grpc_slice path_slice = - GRPC_MDVALUE(op->send_initial_metadata->idx.named.path->md); + GRPC_MDVALUE(op->payload->send_initial_metadata + .send_initial_metadata->idx.named.path->md); /* sum up individual component's lengths and allocate enough memory to * hold combined path+query */ size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); estimated_len++; /* for the '?' */ estimated_len += grpc_base64_estimate_encoded_size( - op->send_message->length, k_url_safe, k_multi_line); + op->payload->send_message.send_message->length, k_url_safe, + k_multi_line); estimated_len += 1; /* for the trailing 0 */ grpc_slice path_with_query_slice = grpc_slice_malloc(estimated_len); @@ -320,8 +325,8 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, write_ptr++; /* for the '?' */ grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes, - op->send_message->length, k_url_safe, - k_multi_line); + op->payload->send_message.send_message->length, + k_url_safe, k_multi_line); /* remove trailing unused memory and add trailing 0 to terminate string */ @@ -335,14 +340,15 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, /* substitute previous path with the new path+query */ grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices( exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); - grpc_metadata_batch *b = op->send_initial_metadata; + grpc_metadata_batch *b = + op->payload->send_initial_metadata.send_initial_metadata; error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, mdelem_path_and_query); if (error != GRPC_ERROR_NONE) return error; calld->on_complete = op->on_complete; op->on_complete = &calld->hc_on_complete; - op->send_message = NULL; + op->send_message = false; grpc_slice_unref_internal(exec_ctx, path_with_query_slice); } else { /* Not all data is available. Fall back to POST. */ @@ -353,47 +359,60 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, } } - remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_METHOD); - remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_SCHEME); - remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_TE); - remove_if_present(exec_ctx, op->send_initial_metadata, + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_METHOD); + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_SCHEME); + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_TE); + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_CONTENT_TYPE); - remove_if_present(exec_ctx, op->send_initial_metadata, + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_USER_AGENT); /* Send : prefixed headers, which have to be before any application layer headers. */ - error = grpc_metadata_batch_add_head(exec_ctx, op->send_initial_metadata, - &calld->method, method); + error = grpc_metadata_batch_add_head( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->method, method); if (error != GRPC_ERROR_NONE) return error; - error = - grpc_metadata_batch_add_head(exec_ctx, op->send_initial_metadata, - &calld->scheme, channeld->static_scheme); + error = grpc_metadata_batch_add_head( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->scheme, channeld->static_scheme); if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata, - &calld->te_trailers, - GRPC_MDELEM_TE_TRAILERS); + error = grpc_metadata_batch_add_tail( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); if (error != GRPC_ERROR_NONE) return error; error = grpc_metadata_batch_add_tail( - exec_ctx, op->send_initial_metadata, &calld->content_type, - GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata, - &calld->user_agent, - GRPC_MDELEM_REF(channeld->user_agent)); + error = grpc_metadata_batch_add_tail( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); if (error != GRPC_ERROR_NONE) return error; } - if (op->recv_initial_metadata != NULL) { + if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->hc_on_recv_initial_metadata; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->on_done_recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->hc_on_recv_initial_metadata; } - if (op->recv_trailing_metadata != NULL) { + if (op->recv_trailing_metadata) { /* substitute our callback for the higher callback */ - calld->recv_trailing_metadata = op->recv_trailing_metadata; + calld->recv_trailing_metadata = + op->payload->recv_trailing_metadata.recv_trailing_metadata; calld->on_done_recv_trailing_metadata = op->on_complete; op->on_complete = &calld->hc_on_recv_trailing_metadata; } @@ -403,17 +422,17 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("hc_start_transport_op", 0); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); grpc_error *error = hc_mutate_op(exec_ctx, elem, op); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); } else { call_data *calld = elem->call_data; - if (op->send_message != NULL && calld->send_message_blocked) { + if (op->send_message && calld->send_message_blocked) { /* Don't forward the op. send_message contains slices that aren't ready - yet. The call will be forwarded by the op_complete of slice read call. + yet. The call will be forwarded by the op_complete of slice read call. */ } else { grpc_call_next_op(exec_ctx, elem, op); diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index 8d3c488ea0..c1e49ffacc 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -37,7 +37,7 @@ #include <grpc/support/log.h> #include <string.h> #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/security/util/b64.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -58,8 +58,7 @@ typedef struct call_data { bool payload_bin_delivered; grpc_metadata_batch *recv_initial_metadata; - bool *recv_idempotent_request; - bool *recv_cacheable_request; + uint32_t *recv_initial_metadata_flags; /** Closure to call when finished with the hs_on_recv hook */ grpc_closure *on_done_recv; /** Closure to call when we retrieve read message from the path URI @@ -116,14 +115,21 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, if (b->idx.named.method != NULL) { if (grpc_mdelem_eq(b->idx.named.method->md, GRPC_MDELEM_METHOD_POST)) { - *calld->recv_idempotent_request = false; - *calld->recv_cacheable_request = false; + *calld->recv_initial_metadata_flags &= + ~(GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST); } else if (grpc_mdelem_eq(b->idx.named.method->md, GRPC_MDELEM_METHOD_PUT)) { - *calld->recv_idempotent_request = true; + *calld->recv_initial_metadata_flags &= + ~GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; + *calld->recv_initial_metadata_flags |= + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; } else if (grpc_mdelem_eq(b->idx.named.method->md, GRPC_MDELEM_METHOD_GET)) { - *calld->recv_cacheable_request = true; + *calld->recv_initial_metadata_flags |= + GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; + *calld->recv_initial_metadata_flags &= + ~GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; } else { add_error(error_name, &error, grpc_attach_md_to_error( @@ -206,7 +212,8 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path"))); - } else if (*calld->recv_cacheable_request == true) { + } else if (*calld->recv_initial_metadata_flags & + GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { /* We have a cacheable request made with GET verb. The path contains the * query parameter which is base64 encoded request payload. */ const char k_query_separator = '?'; @@ -215,7 +222,7 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, size_t path_length = GRPC_SLICE_LENGTH(path_slice); /* offset of the character '?' */ size_t offset = 0; - for (offset = 0; *path_ptr != k_query_separator && offset < path_length; + for (offset = 0; offset < path_length && *path_ptr != k_query_separator; path_ptr++, offset++) ; if (offset < path_length) { @@ -311,45 +318,53 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, } static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - if (op->send_initial_metadata != NULL) { + if (op->send_initial_metadata) { grpc_error *error = GRPC_ERROR_NONE; static const char *error_name = "Failed sending initial metadata"; - add_error(error_name, &error, grpc_metadata_batch_add_head( - exec_ctx, op->send_initial_metadata, - &calld->status, GRPC_MDELEM_STATUS_200)); - add_error(error_name, &error, - grpc_metadata_batch_add_tail( - exec_ctx, op->send_initial_metadata, &calld->content_type, - GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)); + add_error( + error_name, &error, + grpc_metadata_batch_add_head( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->status, GRPC_MDELEM_STATUS_200)); + add_error( + error_name, &error, + grpc_metadata_batch_add_tail( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->content_type, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)); add_error(error_name, &error, - server_filter_outgoing_metadata(exec_ctx, elem, - op->send_initial_metadata)); + server_filter_outgoing_metadata( + exec_ctx, elem, + op->payload->send_initial_metadata.send_initial_metadata)); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; } } if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ - GPR_ASSERT(op->recv_idempotent_request != NULL); - GPR_ASSERT(op->recv_cacheable_request != NULL); - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->recv_idempotent_request = op->recv_idempotent_request; - calld->recv_cacheable_request = op->recv_cacheable_request; - calld->on_done_recv = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->hs_on_recv; + GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags != NULL); + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->recv_initial_metadata_flags = + op->payload->recv_initial_metadata.recv_flags; + calld->on_done_recv = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->hs_on_recv; } if (op->recv_message) { - calld->recv_message_ready = op->recv_message_ready; - calld->pp_recv_message = op->recv_message; - if (op->recv_message_ready) { - op->recv_message_ready = &calld->hs_recv_message_ready; + calld->recv_message_ready = op->payload->recv_message.recv_message_ready; + calld->pp_recv_message = op->payload->recv_message.recv_message; + if (op->payload->recv_message.recv_message_ready) { + op->payload->recv_message.recv_message_ready = + &calld->hs_recv_message_ready; } if (op->on_complete) { calld->on_complete = op->on_complete; @@ -359,9 +374,10 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (op->send_trailing_metadata) { grpc_error *error = server_filter_outgoing_metadata( - exec_ctx, elem, op->send_trailing_metadata); + exec_ctx, elem, + op->payload->send_trailing_metadata.send_trailing_metadata); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; } } @@ -369,7 +385,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); GPR_TIMER_BEGIN("hs_start_transport_op", 0); hs_mutate_op(exec_ctx, elem, op); diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 63136650a5..57726c8476 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -132,21 +132,23 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, gpr_free(message_string); } // Invoke the next callback. - grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error); + grpc_closure_run(exec_ctx, calld->next_recv_message_ready, error); } // Start transport stream op. -static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_transport_stream_op* op) { +static void start_transport_stream_op_batch( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { call_data* calld = elem->call_data; // Check max send message size. - if (op->send_message != NULL && calld->max_send_size >= 0 && - op->send_message->length > (size_t)calld->max_send_size) { + if (op->send_message && calld->max_send_size >= 0 && + op->payload->send_message.send_message->length > + (size_t)calld->max_send_size) { char* message_string; gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", - op->send_message->length, calld->max_send_size); - grpc_transport_stream_op_finish_with_failure( + op->payload->send_message.send_message->length, + calld->max_send_size); + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, @@ -155,10 +157,11 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, return; } // Inject callback for receiving a message. - if (op->recv_message_ready != NULL) { - calld->next_recv_message_ready = op->recv_message_ready; - calld->recv_message = op->recv_message; - op->recv_message_ready = &calld->recv_message_ready; + if (op->recv_message) { + calld->next_recv_message_ready = + op->payload->recv_message.recv_message_ready; + calld->recv_message = op->payload->recv_message.recv_message; + op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } // Chain to the next filter. grpc_call_next_op(exec_ctx, elem, op); @@ -253,7 +256,7 @@ static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, } const grpc_channel_filter grpc_message_size_filter = { - start_transport_stream_op, + start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/security/credentials/jwt/json_token.c b/src/core/lib/security/credentials/jwt/json_token.c index 192a5f47ed..aa905725fc 100644 --- a/src/core/lib/security/credentials/jwt/json_token.c +++ b/src/core/lib/security/credentials/jwt/json_token.c @@ -40,8 +40,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> -#include "src/core/lib/security/util/b64.h" #include "src/core/lib/security/util/json_util.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/support/string.h" #include <openssl/bio.h> diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index b10a5da2a2..0e2a264371 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -45,7 +45,7 @@ #include "src/core/lib/http/httpcli.h" #include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/security/util/b64.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" #include "src/core/tsi/ssl_types.h" diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 8f321b9911..f526653ffa 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -64,7 +64,7 @@ typedef struct { pollset_set so that work can progress when this call wants work to progress */ grpc_polling_entity *pollent; - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; uint8_t security_context_set; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; grpc_auth_metadata_context auth_md_context; @@ -108,7 +108,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, const char *error_details) { grpc_call_element *elem = (grpc_call_element *)user_data; call_data *calld = elem->call_data; - grpc_transport_stream_op *op = &calld->op; + grpc_transport_stream_op_batch *op = &calld->op; grpc_metadata_batch *mdb; size_t i; reset_auth_metadata_context(&calld->auth_md_context); @@ -122,8 +122,8 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); } else { GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); - GPR_ASSERT(op->send_initial_metadata != NULL); - mdb = op->send_initial_metadata; + GPR_ASSERT(op->send_initial_metadata); + mdb = op->payload->send_initial_metadata.send_initial_metadata; for (i = 0; i < num_md; i++) { add_error(&error, grpc_metadata_batch_add_tail( @@ -136,7 +136,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, if (error == GRPC_ERROR_NONE) { grpc_call_next_op(exec_ctx, elem, op); } else { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); } } @@ -172,11 +172,13 @@ void build_auth_metadata_context(grpc_security_connector *sc, static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = - (grpc_client_security_context *)op->context[GRPC_CONTEXT_SECURITY].value; + (grpc_client_security_context *)op->payload + ->context[GRPC_CONTEXT_SECURITY] + .value; grpc_call_credentials *channel_call_creds = chand->security_connector->request_metadata_creds; int call_creds_has_md = (ctx != NULL) && (ctx->creds != NULL); @@ -191,7 +193,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, calld->creds = grpc_composite_call_credentials_create(channel_call_creds, ctx->creds, NULL); if (calld->creds == NULL) { - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -242,7 +244,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, that is being sent or received. */ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("auth_start_transport_op", 0); /* grab pointers to our data from the call element */ @@ -251,23 +253,25 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_linked_mdelem *l; grpc_client_security_context *sec_ctx = NULL; - if (calld->security_context_set == 0 && op->cancel_error == GRPC_ERROR_NONE) { + if (calld->security_context_set == 0 && !op->cancel_stream) { calld->security_context_set = 1; - GPR_ASSERT(op->context); - if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) { - op->context[GRPC_CONTEXT_SECURITY].value = + GPR_ASSERT(op->payload->context != NULL); + if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + op->payload->context[GRPC_CONTEXT_SECURITY].value = grpc_client_security_context_create(); - op->context[GRPC_CONTEXT_SECURITY].destroy = + op->payload->context[GRPC_CONTEXT_SECURITY].destroy = grpc_client_security_context_destroy; } - sec_ctx = op->context[GRPC_CONTEXT_SECURITY].value; + sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); sec_ctx->auth_context = GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); } - if (op->send_initial_metadata != NULL) { - for (l = op->send_initial_metadata->list.head; l != NULL; l = l->next) { + if (op->send_initial_metadata) { + for (l = op->payload->send_initial_metadata.send_initial_metadata->list + .head; + l != NULL; l = l->next) { grpc_mdelem md = l->md; /* Pointer comparison is OK for md_elems created from the same context. */ diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 3cf0632220..1aca76f9e8 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -49,7 +49,7 @@ typedef struct call_data { up-call on transport_op, and remember to call our on_done_recv member after handling it. */ grpc_closure auth_on_recv; - grpc_transport_stream_op *transport_op; + grpc_transport_stream_op_batch *transport_op; grpc_metadata_array md; const grpc_metadata *consumed_md; size_t num_consumed_md; @@ -138,12 +138,11 @@ static void on_md_processing_done( error_details = error_details != NULL ? error_details : "Authentication metadata processing failed."; - calld->transport_op->send_initial_metadata = NULL; - if (calld->transport_op->send_message != NULL) { - grpc_byte_stream_destroy(&exec_ctx, calld->transport_op->send_message); - calld->transport_op->send_message = NULL; + if (calld->transport_op->send_message) { + grpc_byte_stream_destroy( + &exec_ctx, calld->transport_op->payload->send_message.send_message); + calld->transport_op->payload->send_message.send_message = NULL; } - calld->transport_op->send_trailing_metadata = NULL; grpc_closure_sched( &exec_ctx, calld->on_done_recv, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), @@ -171,14 +170,17 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, } static void set_recv_ops_md_callbacks(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; - if (op->recv_initial_metadata != NULL) { + if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->auth_on_recv; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->on_done_recv = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->auth_on_recv; calld->transport_op = op; } } @@ -190,7 +192,7 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem, that is being sent or received. */ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { set_recv_ops_md_callbacks(elem, op); grpc_call_next_op(exec_ctx, elem, op); } diff --git a/src/core/lib/security/util/b64.c b/src/core/lib/slice/b64.c index 0d5a917660..2007cc4810 100644 --- a/src/core/lib/security/util/b64.c +++ b/src/core/lib/slice/b64.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/security/util/b64.h" +#include "src/core/lib/slice/b64.h" #include <stdint.h> #include <string.h> diff --git a/src/core/lib/security/util/b64.h b/src/core/lib/slice/b64.h index ef52291c6a..5cc821f4bf 100644 --- a/src/core/lib/security/util/b64.h +++ b/src/core/lib/slice/b64.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_SECURITY_UTIL_B64_H -#define GRPC_CORE_LIB_SECURITY_UTIL_B64_H +#ifndef GRPC_CORE_LIB_SLICE_B64_H +#define GRPC_CORE_LIB_SLICE_B64_H #include <grpc/slice.h> @@ -62,4 +62,4 @@ grpc_slice grpc_base64_decode(grpc_exec_ctx *exec_ctx, const char *b64, grpc_slice grpc_base64_decode_with_len(grpc_exec_ctx *exec_ctx, const char *b64, size_t b64_len, int url_safe); -#endif /* GRPC_CORE_LIB_SECURITY_UTIL_B64_H */ +#endif /* GRPC_CORE_LIB_SLICE_B64_H */ diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index a9317a4694..87787b3eea 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -117,25 +117,32 @@ static received_status unpack_received_status(gpr_atm atm) { typedef struct batch_control { grpc_call *call; - grpc_cq_completion cq_completion; + /* Share memory for cq_completion and notify_tag as they are never needed + simultaneously. Each byte used in this data structure count as six bytes + per call, so any savings we can make are worthwhile, + + We use notify_tag to determine whether or not to send notification to the + completion queue. Once we've made that determination, we can reuse the + memory for cq_completion. */ + union { + grpc_cq_completion cq_completion; + struct { + /* Any given op indicates completion by either (a) calling a closure or + (b) sending a notification on the call's completion queue. If + \a is_closure is true, \a tag indicates a closure to be invoked; + otherwise, \a tag indicates the tag to be used in the notification to + be sent to the completion queue. */ + void *tag; + bool is_closure; + } notify_tag; + } completion_data; grpc_closure finish_batch; - void *notify_tag; gpr_refcount steps_to_complete; grpc_error *errors[MAX_ERRORS_PER_BATCH]; gpr_atm num_errors; - uint8_t send_initial_metadata; - uint8_t send_message; - uint8_t send_final_op; - uint8_t recv_initial_metadata; - uint8_t recv_message; - uint8_t recv_final_op; - uint8_t is_notify_tag_closure; - - /* TODO(ctiller): now that this is inlined, figure out how much of the above - state can be eliminated */ - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; } batch_control; struct grpc_call { @@ -169,6 +176,7 @@ struct grpc_call { bool has_initial_md_been_received; batch_control active_batches[MAX_CONCURRENT_BATCHES]; + grpc_transport_stream_op_batch_payload stream_op_payload; /* first idx: is_receiving, second idx: is_trailing */ grpc_metadata_batch metadata_batch[2][2]; @@ -239,7 +247,7 @@ int grpc_call_error_trace = 0; CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description); @@ -291,6 +299,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; + call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); if (call->is_client) { GPR_ASSERT(args->add_initial_metadata_count < @@ -535,13 +544,12 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { } static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { grpc_call_element *elem; GPR_TIMER_BEGIN("execute_op", 0); elem = CALL_ELEM_FROM_CALL(call, 0); - op->context = call->context; - elem->filter->start_transport_stream_op(exec_ctx, elem, op); + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); GPR_TIMER_END("execute_op", 0); } @@ -594,9 +602,10 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - grpc_transport_stream_op *op = grpc_make_transport_stream_op( + grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); - op->cancel_error = error; + op->cancel_stream = true; + op->payload->cancel_stream.cancel_error = error; execute_op(exec_ctx, c, op); } @@ -1025,16 +1034,13 @@ static batch_control *allocate_batch_control(grpc_call *call, const grpc_op *ops, size_t num_ops) { int slot = batch_slot_for_op(ops[0].op); - for (size_t i = 1; i < num_ops; i++) { - int op_slot = batch_slot_for_op(ops[i].op); - slot = GPR_MIN(slot, op_slot); - } batch_control *bctl = &call->active_batches[slot]; if (bctl->call != NULL) { return NULL; } memset(bctl, 0, sizeof(*bctl)); bctl->call = call; + bctl->op.payload = &call->stream_op_payload; return bctl; } @@ -1074,20 +1080,20 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); - if (bctl->send_initial_metadata) { + if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } - if (bctl->send_message) { + if (bctl->op.send_message) { call->sending_message = false; } - if (bctl->send_final_op) { + if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( exec_ctx, &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } - if (bctl->recv_final_op) { + if (bctl->op.recv_trailing_metadata) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); @@ -1123,15 +1129,16 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, error = GRPC_ERROR_NONE; } - if (bctl->is_notify_tag_closure) { + if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = NULL; - grpc_closure_run(exec_ctx, bctl->notify_tag, error); + grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ - grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error, - finish_batch_completion, bctl, &bctl->cq_completion); + grpc_cq_end_op( + exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error, + finish_batch_completion, bctl, &bctl->completion_data.cq_completion); } } @@ -1374,11 +1381,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (bctl == NULL) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } - bctl->notify_tag = notify_tag; - bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); + bctl->completion_data.notify_tag.tag = notify_tag; + bctl->completion_data.notify_tag.is_closure = + (uint8_t)(is_notify_tag_closure != 0); - grpc_transport_stream_op *stream_op = &bctl->op; - memset(stream_op, 0, sizeof(*stream_op)); + grpc_transport_stream_op_batch *stream_op = &bctl->op; + grpc_transport_stream_op_batch_payload *stream_op_payload = + &call->stream_op_payload; stream_op->covered_by_poller = true; /* rewrite batch ops into a transport op */ @@ -1432,8 +1441,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - bctl->send_initial_metadata = 1; - call->sent_initial_metadata = 1; + stream_op->send_initial_metadata = true; + call->sent_initial_metadata = true; if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, @@ -1443,9 +1452,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } /* TODO(ctiller): just make these the same variable? */ call->metadata_batch[0][0].deadline = call->send_deadline; - stream_op->send_initial_metadata = + stream_op_payload->send_initial_metadata.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; - stream_op->send_initial_metadata_flags = op->flags; + stream_op_payload->send_initial_metadata.send_initial_metadata_flags = + op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1460,8 +1470,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - bctl->send_message = 1; - call->sending_message = 1; + stream_op->send_message = true; + call->sending_message = true; grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message.send_message->data.raw.slice_buffer, @@ -1473,7 +1483,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_COMPRESS_NONE) { call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - stream_op->send_message = &call->sending_stream.base; + stream_op_payload->send_message.send_message = + &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ @@ -1489,9 +1500,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - bctl->send_final_op = 1; - call->sent_final_op = 1; - stream_op->send_trailing_metadata = + stream_op->send_trailing_metadata = true; + call->sent_final_op = true; + stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: @@ -1513,8 +1524,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - bctl->send_final_op = 1; - call->sent_final_op = 1; + stream_op->send_trailing_metadata = true; + call->sent_final_op = true; GPR_ASSERT(call->send_extra_metadata_count == 0); call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( @@ -1553,7 +1564,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - stream_op->send_trailing_metadata = + stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: @@ -1570,16 +1581,16 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, from server.c. In that case, it's coming from accept_stream, and in that case we're not necessarily covered by a poller. */ stream_op->covered_by_poller = call->is_client; - call->received_initial_metadata = 1; + call->received_initial_metadata = true; call->buffered_metadata[0] = op->data.recv_initial_metadata.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl, grpc_schedule_on_exec_ctx); - bctl->recv_initial_metadata = 1; - stream_op->recv_initial_metadata = + stream_op->recv_initial_metadata = true; + stream_op_payload->recv_initial_metadata.recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - stream_op->recv_initial_metadata_ready = + stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; num_completion_callbacks_needed++; break; @@ -1593,13 +1604,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->receiving_message = 1; - bctl->recv_message = 1; + call->receiving_message = true; + stream_op->recv_message = true; call->receiving_buffer = op->data.recv_message.recv_message; - stream_op->recv_message = &call->receiving_stream; + stream_op_payload->recv_message.recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, bctl, grpc_schedule_on_exec_ctx); - stream_op->recv_message_ready = &call->receiving_stream_ready; + stream_op_payload->recv_message.recv_message_ready = + &call->receiving_stream_ready; num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: @@ -1616,16 +1628,17 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->requested_final_op = 1; + call->requested_final_op = true; call->buffered_metadata[1] = op->data.recv_status_on_client.trailing_metadata; call->final_op.client.status = op->data.recv_status_on_client.status; call->final_op.client.status_details = op->data.recv_status_on_client.status_details; - bctl->recv_final_op = 1; - stream_op->recv_trailing_metadata = + stream_op->recv_trailing_metadata = true; + stream_op->collect_stats = true; + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op->collect_stats = + stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: @@ -1642,13 +1655,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->requested_final_op = 1; + call->requested_final_op = true; call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; - bctl->recv_final_op = 1; - stream_op->recv_trailing_metadata = + stream_op->recv_trailing_metadata = true; + stream_op->collect_stats = true; + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op->collect_stats = + stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; break; } @@ -1660,7 +1674,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); - stream_op->context = call->context; grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; @@ -1674,26 +1687,26 @@ done: done_with_error: /* reverse any mutations that occured */ - if (bctl->send_initial_metadata) { - call->sent_initial_metadata = 0; + if (stream_op->send_initial_metadata) { + call->sent_initial_metadata = false; grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]); } - if (bctl->send_message) { - call->sending_message = 0; + if (stream_op->send_message) { + call->sending_message = false; grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base); } - if (bctl->send_final_op) { - call->sent_final_op = 0; + if (stream_op->send_trailing_metadata) { + call->sent_final_op = false; grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]); } - if (bctl->recv_initial_metadata) { - call->received_initial_metadata = 0; + if (stream_op->recv_initial_metadata) { + call->received_initial_metadata = false; } - if (bctl->recv_message) { - call->receiving_message = 0; + if (stream_op->recv_message) { + call->receiving_message = false; } - if (bctl->recv_final_op) { - call->requested_final_op = 0; + if (stream_op->recv_trailing_metadata) { + call->requested_final_op = false; } goto done; } diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index b4bfb92042..b3ba826bbc 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -150,17 +150,20 @@ grpc_channel *grpc_channel_create_with_builder( } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) { channel->compression_options.default_level.is_set = true; - GPR_ASSERT(args->args[i].value.integer >= 0 && - args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT); channel->compression_options.default_level.level = - (grpc_compression_level)args->args[i].value.integer; + (grpc_compression_level)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){GRPC_COMPRESS_LEVEL_NONE, + GRPC_COMPRESS_LEVEL_NONE, + GRPC_COMPRESS_LEVEL_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { channel->compression_options.default_algorithm.is_set = true; - GPR_ASSERT(args->args[i].value.integer >= 0 && - args->args[i].value.integer < GRPC_COMPRESS_ALGORITHMS_COUNT); channel->compression_options.default_algorithm.algorithm = - (grpc_compression_algorithm)args->args[i].value.integer; + (grpc_compression_algorithm)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, + GRPC_COMPRESS_ALGORITHMS_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 0c408aa288..82428c42c0 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -80,16 +80,18 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } -static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void lame_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->recv_initial_metadata != NULL) { - fill_metadata(exec_ctx, elem, op->recv_initial_metadata); - } else if (op->recv_trailing_metadata != NULL) { - fill_metadata(exec_ctx, elem, op->recv_trailing_metadata); + if (op->recv_initial_metadata) { + fill_metadata(exec_ctx, elem, + op->payload->recv_initial_metadata.recv_initial_metadata); + } else if (op->recv_trailing_metadata) { + fill_metadata(exec_ctx, elem, + op->payload->recv_trailing_metadata.recv_trailing_metadata); } - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } @@ -148,7 +150,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_lame_filter = { - lame_start_transport_stream_op, + lame_start_transport_stream_op_batch, lame_start_transport_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index a123c9ca43..191ee75252 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -154,8 +154,7 @@ struct call_data { grpc_completion_queue *cq_new; grpc_metadata_batch *recv_initial_metadata; - bool recv_idempotent_request; - bool recv_cacheable_request; + uint32_t recv_initial_metadata_flags; grpc_metadata_array initial_metadata; request_matcher *request_matcher; @@ -498,13 +497,7 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->data.batch.details->host = grpc_slice_ref_internal(calld->host); rc->data.batch.details->method = grpc_slice_ref_internal(calld->path); rc->data.batch.details->deadline = calld->deadline; - rc->data.batch.details->flags = - (calld->recv_idempotent_request - ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST - : 0) | - (calld->recv_cacheable_request - ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST - : 0); + rc->data.batch.details->flags = calld->recv_initial_metadata_flags; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; @@ -632,7 +625,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!grpc_slice_eq(rm->host, calld->host)) continue; if (!grpc_slice_eq(rm->method, calld->path)) continue; if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && - !calld->recv_idempotent_request) { + 0 == (calld->recv_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } finish_start_new_rpc(exec_ctx, server, elem, @@ -649,7 +643,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (rm->has_host) continue; if (!grpc_slice_eq(rm->method, calld->path)) continue; if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && - !calld->recv_idempotent_request) { + 0 == (calld->recv_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } finish_start_new_rpc(exec_ctx, server, elem, @@ -781,22 +776,25 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, } static void server_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; - if (op->recv_initial_metadata != NULL) { - GPR_ASSERT(op->recv_idempotent_request == NULL); - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; - op->recv_idempotent_request = &calld->recv_idempotent_request; - op->recv_cacheable_request = &calld->recv_cacheable_request; + if (op->recv_initial_metadata) { + GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == NULL); + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->on_done_recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->server_on_recv_initial_metadata; + op->payload->recv_initial_metadata.recv_flags = + &calld->recv_initial_metadata_flags; } } -static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void server_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); @@ -960,7 +958,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_server_top_filter = { - server_start_transport_stream_op, + server_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index d56cb31ee0..82c4e004b7 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -170,7 +170,7 @@ int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { transport->vtable->perform_stream_op(exec_ctx, transport, stream, op); } @@ -213,14 +213,23 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, return transport->vtable->get_endpoint(exec_ctx, transport); } -void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_error *error) { - grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error)); - grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); +void grpc_transport_stream_op_batch_finish_with_failure( + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_error *error) { + if (op->recv_message) { + grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error)); + } + if (op->recv_initial_metadata) { + grpc_closure_sched( + exec_ctx, + op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); + } grpc_closure_sched(exec_ctx, op->on_complete, error); - GRPC_ERROR_UNREF(op->cancel_error); + if (op->cancel_stream) { + GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error); + } } typedef struct { @@ -249,7 +258,8 @@ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { typedef struct { grpc_closure outer_on_complete; grpc_closure *inner_on_complete; - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload payload; } made_transport_stream_op; static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, @@ -260,13 +270,13 @@ static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error)); } -grpc_transport_stream_op *grpc_make_transport_stream_op( +grpc_transport_stream_op_batch *grpc_make_transport_stream_op( grpc_closure *on_complete) { - made_transport_stream_op *op = gpr_malloc(sizeof(*op)); + made_transport_stream_op *op = gpr_zalloc(sizeof(*op)); + op->op.payload = &op->payload; grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op, op, grpc_schedule_on_exec_ctx); op->inner_on_complete = on_complete; - memset(&op->op, 0, sizeof(op->op)); op->op.on_complete = &op->outer_on_complete; return &op->op; } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 950b18aeda..93369cc689 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -109,55 +109,98 @@ void grpc_transport_move_stats(grpc_transport_stream_stats *from, grpc_transport_stream_stats *to); typedef struct { + void *extra_arg; grpc_closure closure; - void *args[2]; -} grpc_transport_private_op_data; +} grpc_handler_private_op_data; + +typedef struct grpc_transport_stream_op_batch_payload + grpc_transport_stream_op_batch_payload; /* Transport stream op: a set of operations to perform on a transport against a single stream */ -typedef struct grpc_transport_stream_op { +typedef struct grpc_transport_stream_op_batch { /** Should be enqueued when all requested operations (excluding recv_message and recv_initial_metadata which have their own closures) in a given batch have been completed. */ grpc_closure *on_complete; + /** Values for the stream op (fields set are determined by flags above) */ + grpc_transport_stream_op_batch_payload *payload; + /** Is the completion of this op covered by a poller (if false: the op should complete independently of some pollset being polled) */ - bool covered_by_poller; + bool covered_by_poller : 1; - /** Send initial metadata to the peer, from the provided metadata batch. - idempotent_request MUST be set if this is non-null */ - grpc_metadata_batch *send_initial_metadata; - /** Iff send_initial_metadata != NULL, flags associated with - send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */ - uint32_t send_initial_metadata_flags; + /** Send initial metadata to the peer, from the provided metadata batch. */ + bool send_initial_metadata : 1; /** Send trailing metadata to the peer, from the provided metadata batch. */ - grpc_metadata_batch *send_trailing_metadata; + bool send_trailing_metadata : 1; /** Send message data to the peer, from the provided byte stream. */ - grpc_byte_stream *send_message; + bool send_message : 1; /** Receive initial metadata from the stream, into provided metadata batch. */ - grpc_metadata_batch *recv_initial_metadata; - bool *recv_idempotent_request; - bool *recv_cacheable_request; - /** Should be enqueued when initial metadata is ready to be processed. */ - grpc_closure *recv_initial_metadata_ready; + bool recv_initial_metadata : 1; /** Receive message data from the stream, into provided byte stream. */ - grpc_byte_stream **recv_message; - /** Should be enqueued when one message is ready to be processed. */ - grpc_closure *recv_message_ready; + bool recv_message : 1; /** Receive trailing metadata from the stream, into provided metadata batch. */ - grpc_metadata_batch *recv_trailing_metadata; + bool recv_trailing_metadata : 1; /** Collect any stats into provided buffer, zero internal stat counters */ - grpc_transport_stream_stats *collect_stats; + bool collect_stats : 1; + + /** Cancel this stream with the provided error */ + bool cancel_stream : 1; + + /*************************************************************************** + * remaining fields are initialized and used at the discretion of the + * current handler of the op */ - /** If != GRPC_ERROR_NONE, forcefully close this stream. + grpc_handler_private_op_data handler_private; +} grpc_transport_stream_op_batch; + +struct grpc_transport_stream_op_batch_payload { + struct { + grpc_metadata_batch *send_initial_metadata; + /** Iff send_initial_metadata != NULL, flags associated with + send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */ + uint32_t send_initial_metadata_flags; + } send_initial_metadata; + + struct { + grpc_metadata_batch *send_trailing_metadata; + } send_trailing_metadata; + + struct { + grpc_byte_stream *send_message; + } send_message; + + struct { + grpc_metadata_batch *recv_initial_metadata; + uint32_t *recv_flags; + /** Should be enqueued when initial metadata is ready to be processed. */ + grpc_closure *recv_initial_metadata_ready; + } recv_initial_metadata; + + struct { + grpc_byte_stream **recv_message; + /** Should be enqueued when one message is ready to be processed. */ + grpc_closure *recv_message_ready; + } recv_message; + + struct { + grpc_metadata_batch *recv_trailing_metadata; + } recv_trailing_metadata; + + struct { + grpc_transport_stream_stats *collect_stats; + } collect_stats; + + /** Forcefully close this stream. The HTTP2 semantics should be: - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and trailing metadata has not been sent, send trailing metadata with status @@ -167,17 +210,13 @@ typedef struct grpc_transport_stream_op { convert to a HTTP2 error code using grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this error. */ - grpc_error *cancel_error; + struct { + grpc_error *cancel_error; + } cancel_stream; /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; - - /*************************************************************************** - * remaining fields are initialized and used at the discretion of the - * current handler of the op */ - - grpc_transport_private_op_data handler_private; -} grpc_transport_stream_op; +}; /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { @@ -210,7 +249,7 @@ typedef struct grpc_transport_op { * remaining fields are initialized and used at the discretion of the * transport implementation */ - grpc_transport_private_op_data transport_private; + grpc_handler_private_op_data handler_private; } grpc_transport_op; /* Returns the amount of memory required to store a grpc_stream for this @@ -250,11 +289,11 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_stream *stream, grpc_closure *then_schedule_closure); -void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_error *error); +void grpc_transport_stream_op_batch_finish_with_failure( + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_error *error); -char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); +char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op); char *grpc_transport_op_string(grpc_transport_op *op); /* Send a batch of operations on a transport @@ -265,11 +304,12 @@ char *grpc_transport_op_string(grpc_transport_op *op); transport - the transport on which to initiate the stream stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. - op - a grpc_transport_stream_op specifying the op to perform */ + op - a grpc_transport_stream_op_batch specifying the op to perform + */ void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *transport, @@ -301,9 +341,10 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to \a on_consumed and then delete the returned transport op */ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); -/* Allocate a grpc_transport_stream_op, and preconfigure the on_consumed closure +/* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed + closure to \a on_consumed and then delete the returned transport op */ -grpc_transport_stream_op *grpc_make_transport_stream_op( +grpc_transport_stream_op_batch *grpc_make_transport_stream_op( grpc_closure *on_consumed); #ifdef __cplusplus diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 6f688bf8d2..bbb19a34bd 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -59,7 +59,8 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_perform_stream_op */ void (*perform_stream_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, grpc_transport_stream_op *op); + grpc_stream *stream, + grpc_transport_stream_op_batch *op); /* implementation of grpc_transport_perform_op */ void (*perform_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self, diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index 28360e3784..3a2a793e01 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -71,7 +71,8 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { } } -char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { +char *grpc_transport_stream_op_batch_string( + grpc_transport_stream_op_batch *op) { char *tmp; char *out; @@ -81,45 +82,49 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { gpr_strvec_add( &b, gpr_strdup(op->covered_by_poller ? "[COVERED]" : "[UNCOVERED]")); - if (op->send_initial_metadata != NULL) { + if (op->send_initial_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{")); - put_metadata_list(&b, *op->send_initial_metadata); + put_metadata_list( + &b, *op->payload->send_initial_metadata.send_initial_metadata); gpr_strvec_add(&b, gpr_strdup("}")); } - if (op->send_message != NULL) { + if (op->send_message) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d", - op->send_message->flags, op->send_message->length); + op->payload->send_message.send_message->flags, + op->payload->send_message.send_message->length); gpr_strvec_add(&b, tmp); } - if (op->send_trailing_metadata != NULL) { + if (op->send_trailing_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("SEND_TRAILING_METADATA{")); - put_metadata_list(&b, *op->send_trailing_metadata); + put_metadata_list( + &b, *op->payload->send_trailing_metadata.send_trailing_metadata); gpr_strvec_add(&b, gpr_strdup("}")); } - if (op->recv_initial_metadata != NULL) { + if (op->recv_initial_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_INITIAL_METADATA")); } - if (op->recv_message != NULL) { + if (op->recv_message) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); } - if (op->recv_trailing_metadata != NULL) { + if (op->recv_trailing_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA")); } - if (op->cancel_error != GRPC_ERROR_NONE) { + if (op->cancel_stream) { gpr_strvec_add(&b, gpr_strdup(" ")); - const char *msg = grpc_error_string(op->cancel_error); + const char *msg = + grpc_error_string(op->payload->cancel_stream.cancel_error); gpr_asprintf(&tmp, "CANCEL:%s", msg); gpr_strvec_add(&b, tmp); @@ -204,8 +209,9 @@ char *grpc_transport_op_string(grpc_transport_op *op) { } void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_transport_stream_op *op) { - char *str = grpc_transport_stream_op_string(op); + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + char *str = grpc_transport_stream_op_batch_string(op); gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); gpr_free(str); } diff --git a/src/core/tsi/transport_security.c b/src/core/tsi/transport_security.c index a3e42e87ec..67ebe1b1f3 100644 --- a/src/core/tsi/transport_security.c +++ b/src/core/tsi/transport_security.c @@ -101,7 +101,7 @@ tsi_result tsi_frame_protector_protect_flush( tsi_frame_protector *self, unsigned char *protected_output_frames, size_t *protected_output_frames_size, size_t *still_pending_size) { if (self == NULL || protected_output_frames == NULL || - protected_output_frames == NULL || still_pending_size == NULL) { + protected_output_frames_size == NULL || still_pending_size == NULL) { return TSI_INVALID_ARGUMENT; } return self->vtable->protect_flush(self, protected_output_frames, |