diff options
Diffstat (limited to 'src/core/ext/filters/http')
4 files changed, 160 insertions, 215 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index a625369b02..a1fb10f5b8 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -68,12 +68,11 @@ typedef struct channel_data { size_t max_payload_size_for_get; } channel_data; -static grpc_error* client_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { if (b->idx.named.status != nullptr) { if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.status); + grpc_metadata_batch_remove(b, b->idx.named.status); } else { char* val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md), GPR_DUMP_ASCII); @@ -98,10 +97,9 @@ static grpc_error* client_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, GRPC_MDVALUE(b->idx.named.grpc_message->md)); if (grpc_slice_is_equivalent(pct_decoded_msg, GRPC_MDVALUE(b->idx.named.grpc_message->md))) { - grpc_slice_unref_internal(exec_ctx, pct_decoded_msg); + grpc_slice_unref_internal(pct_decoded_msg); } else { - grpc_metadata_batch_set_value(exec_ctx, b->idx.named.grpc_message, - pct_decoded_msg); + grpc_metadata_batch_set_value(b->idx.named.grpc_message, pct_decoded_msg); } } @@ -131,60 +129,53 @@ static grpc_error* client_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, gpr_free(val); } } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_type); + grpc_metadata_batch_remove(b, b->idx.named.content_type); } return GRPC_ERROR_NONE; } -static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, - void* user_data, grpc_error* error) { +static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)user_data; call_data* calld = (call_data*)elem->call_data; if (error == GRPC_ERROR_NONE) { - error = client_filter_incoming_metadata(exec_ctx, elem, - calld->recv_initial_metadata); + error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata); } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, - error); + GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error); } -static void recv_trailing_metadata_on_complete(grpc_exec_ctx* exec_ctx, - void* user_data, +static void recv_trailing_metadata_on_complete(void* user_data, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)user_data; call_data* calld = (call_data*)elem->call_data; if (error == GRPC_ERROR_NONE) { - error = client_filter_incoming_metadata(exec_ctx, elem, - calld->recv_trailing_metadata); + error = + client_filter_incoming_metadata(elem, calld->recv_trailing_metadata); } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_trailing_metadata_on_complete, - error); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error); } -static void send_message_on_complete(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void send_message_on_complete(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; - grpc_byte_stream_cache_destroy(exec_ctx, &calld->send_message_cache); - GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + grpc_byte_stream_cache_destroy(&calld->send_message_cache); + GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, GRPC_ERROR_REF(error)); } // Pulls a slice from the send_message byte stream, updating // calld->send_message_bytes_read. -static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx, - call_data* calld) { +static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; grpc_error* error = grpc_byte_stream_pull( - exec_ctx, &calld->send_message_caching_stream.base, &incoming_slice); + &calld->send_message_caching_stream.base, &incoming_slice); if (error == GRPC_ERROR_NONE) { calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice); - grpc_slice_unref_internal(exec_ctx, incoming_slice); + grpc_slice_unref_internal(incoming_slice); } return error; } @@ -194,12 +185,10 @@ static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx, // calld->send_message_caching_stream.base.length, then we have completed // reading from the byte stream; otherwise, an async read has been dispatched // and on_send_message_next_done() will be invoked when it is complete. -static grpc_error* read_all_available_send_message_data(grpc_exec_ctx* exec_ctx, - call_data* calld) { - while (grpc_byte_stream_next(exec_ctx, - &calld->send_message_caching_stream.base, +static grpc_error* read_all_available_send_message_data(call_data* calld) { + while (grpc_byte_stream_next(&calld->send_message_caching_stream.base, ~(size_t)0, &calld->on_send_message_next_done)) { - grpc_error* error = pull_slice_from_send_message(exec_ctx, calld); + grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) return error; if (calld->send_message_bytes_read == calld->send_message_caching_stream.base.length) { @@ -210,19 +199,18 @@ static grpc_error* read_all_available_send_message_data(grpc_exec_ctx* exec_ctx, } // Async callback for grpc_byte_stream_next(). -static void on_send_message_next_done(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void on_send_message_next_done(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + calld->send_message_batch, error, calld->call_combiner); return; } - error = pull_slice_from_send_message(exec_ctx, calld); + error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + calld->send_message_batch, error, calld->call_combiner); return; } // There may or may not be more to read, but we don't care. If we got @@ -230,7 +218,7 @@ static void on_send_message_next_done(grpc_exec_ctx* exec_ctx, void* arg, // synchronously, so we were not able to do a cached call. Instead, // we just reset the byte stream and then send down the batch as-is. grpc_caching_byte_stream_reset(&calld->send_message_caching_stream); - grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); + grpc_call_next_op(elem, calld->send_message_batch); } static char* slice_buffer_to_string(grpc_slice_buffer* slice_buffer) { @@ -248,8 +236,7 @@ static char* slice_buffer_to_string(grpc_slice_buffer* slice_buffer) { // Modifies the path entry in the batch's send_initial_metadata to // append the base64-encoded query for a GET request. -static grpc_error* update_path_for_get(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* update_path_for_get(grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; grpc_slice path_slice = @@ -282,24 +269,22 @@ static grpc_error* update_path_for_get(grpc_exec_ctx* exec_ctx, grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t)); /* substitute previous path with the new path+query */ grpc_mdelem mdelem_path_and_query = - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); + grpc_mdelem_from_slices(GRPC_MDSTR_PATH, path_with_query_slice); grpc_metadata_batch* b = batch->payload->send_initial_metadata.send_initial_metadata; - return grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + return grpc_metadata_batch_substitute(b, b->idx.named.path, mdelem_path_and_query); } -static void remove_if_present(grpc_exec_ctx* exec_ctx, - grpc_metadata_batch* batch, +static void remove_if_present(grpc_metadata_batch* batch, grpc_metadata_batch_callouts_index idx) { if (batch->idx.array[idx] != nullptr) { - grpc_metadata_batch_remove(exec_ctx, batch, batch->idx.array[idx]); + grpc_metadata_batch_remove(batch, batch->idx.array[idx]); } } static void hc_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* batch) { + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; channel_data* channeld = (channel_data*)elem->channel_data; GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0); @@ -345,17 +330,16 @@ static void hc_start_transport_stream_op_batch( calld->original_send_message_on_complete = batch->on_complete; batch->on_complete = &calld->send_message_on_complete; calld->send_message_batch = batch; - error = read_all_available_send_message_data(exec_ctx, calld); + error = read_all_available_send_message_data(calld); if (error != GRPC_ERROR_NONE) goto done; // If all the data has been read, then we can use GET. if (calld->send_message_bytes_read == calld->send_message_caching_stream.base.length) { method = GRPC_MDELEM_METHOD_GET; - error = update_path_for_get(exec_ctx, elem, batch); + error = update_path_for_get(elem, batch); if (error != GRPC_ERROR_NONE) goto done; batch->send_message = false; - grpc_byte_stream_destroy(exec_ctx, - &calld->send_message_caching_stream.base); + grpc_byte_stream_destroy(&calld->send_message_caching_stream.base); } else { // Not all data is available. The batch will be sent down // asynchronously in on_send_message_next_done(). @@ -372,41 +356,41 @@ static void hc_start_transport_stream_op_batch( } remove_if_present( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_METHOD); remove_if_present( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_SCHEME); remove_if_present( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_TE); remove_if_present( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_CONTENT_TYPE); remove_if_present( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_USER_AGENT); /* Send : prefixed headers, which have to be before any application layer headers. */ error = grpc_metadata_batch_add_head( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, &calld->method, method); if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_head( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, &calld->scheme, channeld->static_scheme); if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); if (error != GRPC_ERROR_NONE) goto done; } @@ -414,16 +398,15 @@ static void hc_start_transport_stream_op_batch( done: if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + calld->send_message_batch, error, calld->call_combiner); } else if (!batch_will_be_handled_asynchronously) { - grpc_call_next_op(exec_ctx, elem, batch); + grpc_call_next_op(elem, batch); } GPR_TIMER_END("hc_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; calld->call_combiner = args->call_combiner; @@ -441,7 +424,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) {} @@ -533,8 +516,7 @@ static grpc_slice user_agent_from_args(const grpc_channel_args* args, } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(!args->is_last); @@ -543,17 +525,16 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, chand->max_payload_size_for_get = max_payload_size_from_args(args->channel_args); chand->user_agent = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_USER_AGENT, + GRPC_MDSTR_USER_AGENT, user_agent_from_args(args->channel_args, args->optional_transport->vtable->name)); return GRPC_ERROR_NONE; } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) { +static void destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = (channel_data*)elem->channel_data; - GRPC_MDELEM_UNREF(exec_ctx, chand->user_agent); + GRPC_MDELEM_UNREF(chand->user_agent); } const grpc_channel_filter grpc_http_client_filter = { diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc index 483eb021e8..deec77c96f 100644 --- a/src/core/ext/filters/http/http_filters_plugin.cc +++ b/src/core/ext/filters/http/http_filters_plugin.cc @@ -40,8 +40,7 @@ static bool is_building_http_like_transport( return t != nullptr && strstr(t->vtable->name, "http"); } -static bool maybe_add_optional_filter(grpc_exec_ctx* exec_ctx, - grpc_channel_stack_builder* builder, +static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder, void* arg) { if (!is_building_http_like_transport(builder)) return true; optional_filter* filtarg = (optional_filter*)arg; @@ -55,8 +54,7 @@ static bool maybe_add_optional_filter(grpc_exec_ctx* exec_ctx, : true; } -static bool maybe_add_required_filter(grpc_exec_ctx* exec_ctx, - grpc_channel_stack_builder* builder, +static bool maybe_add_required_filter(grpc_channel_stack_builder* builder, void* arg) { return is_building_http_like_transport(builder) ? grpc_channel_stack_builder_prepend_filter( diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index d070b56b6a..9ae13d2ed2 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -100,12 +100,11 @@ static bool skip_compression(grpc_call_element* elem, uint32_t flags, /** Filter initial metadata */ static grpc_error* process_send_initial_metadata( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_metadata_batch* initial_metadata, + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, bool* has_compression_algorithm) GRPC_MUST_USE_RESULT; static grpc_error* process_send_initial_metadata( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_metadata_batch* initial_metadata, bool* has_compression_algorithm) { + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + bool* has_compression_algorithm) { call_data* calld = (call_data*)elem->call_data; channel_data* channeld = (channel_data*)elem->channel_data; *has_compression_algorithm = false; @@ -137,13 +136,13 @@ static grpc_error* process_send_initial_metadata( } *has_compression_algorithm = true; grpc_metadata_batch_remove( - exec_ctx, initial_metadata, + initial_metadata, initial_metadata->idx.named.grpc_internal_stream_encoding_request); /* Disable message-wise compression */ calld->compression_algorithm = GRPC_COMPRESS_NONE; if (initial_metadata->idx.named.grpc_internal_encoding_request != nullptr) { grpc_metadata_batch_remove( - exec_ctx, initial_metadata, + initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); } } else if (initial_metadata->idx.named.grpc_internal_encoding_request != @@ -160,7 +159,7 @@ static grpc_error* process_send_initial_metadata( } *has_compression_algorithm = true; grpc_metadata_batch_remove( - exec_ctx, initial_metadata, + initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); } else { /* If no algorithm was found in the metadata and we aren't @@ -181,12 +180,11 @@ static grpc_error* process_send_initial_metadata( /* hint compression algorithm */ if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, - &calld->stream_compression_algorithm_storage, + initial_metadata, &calld->stream_compression_algorithm_storage, grpc_stream_compression_encoding_mdelem(stream_compression_algorithm)); } else if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, &calld->compression_algorithm_storage, + initial_metadata, &calld->compression_algorithm_storage, grpc_compression_encoding_mdelem(calld->compression_algorithm)); } @@ -194,7 +192,7 @@ static grpc_error* process_send_initial_metadata( /* convey supported compression algorithms */ error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, &calld->accept_encoding_storage, + initial_metadata, &calld->accept_encoding_storage, GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( channeld->supported_compression_algorithms)); @@ -203,7 +201,7 @@ static grpc_error* process_send_initial_metadata( /* Do not overwrite accept-encoding header if it already presents. */ if (!initial_metadata->idx.named.accept_encoding) { error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, &calld->accept_stream_encoding_storage, + initial_metadata, &calld->accept_stream_encoding_storage, GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS( channeld->supported_stream_compression_algorithms)); } @@ -211,17 +209,15 @@ static grpc_error* process_send_initial_metadata( return error; } -static void send_message_on_complete(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void send_message_on_complete(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + grpc_slice_buffer_reset_and_unref_internal(&calld->slices); + GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void send_message_batch_continue(grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; // Note: The call to grpc_call_next_op() results in yielding the // call combiner, so we need to clear calld->send_message_batch @@ -229,19 +225,18 @@ static void send_message_batch_continue(grpc_exec_ctx* exec_ctx, grpc_transport_stream_op_batch* send_message_batch = calld->send_message_batch; calld->send_message_batch = nullptr; - grpc_call_next_op(exec_ctx, elem, send_message_batch); + grpc_call_next_op(elem, send_message_batch); } -static void finish_send_message(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void finish_send_message(grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); uint32_t send_flags = calld->send_message_batch->payload->send_message.send_message->flags; - bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); + bool did_compress = + grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (grpc_compression_trace.enabled()) { const char* algo_name; @@ -268,11 +263,11 @@ static void finish_send_message(grpc_exec_ctx* exec_ctx, algo_name, calld->slices.length); } } - grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); + grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. grpc_byte_stream_destroy( - exec_ctx, calld->send_message_batch->payload->send_message.send_message); + calld->send_message_batch->payload->send_message.send_message); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, send_flags); calld->send_message_batch->payload->send_message.send_message = @@ -280,27 +275,24 @@ static void finish_send_message(grpc_exec_ctx* exec_ctx, calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; - send_message_batch_continue(exec_ctx, elem); + send_message_batch_continue(elem); } -static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, - void* arg, +static void fail_send_message_batch_in_call_combiner(void* arg, grpc_error* error) { call_data* calld = (call_data*)arg; if (calld->send_message_batch != nullptr) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error), - calld->call_combiner); + calld->send_message_batch, GRPC_ERROR_REF(error), calld->call_combiner); calld->send_message_batch = nullptr; } } // Pulls a slice from the send_message byte stream and adds it to calld->slices. -static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx, - call_data* calld) { +static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; grpc_error* error = grpc_byte_stream_pull( - exec_ctx, calld->send_message_batch->payload->send_message.send_message, + calld->send_message_batch->payload->send_message.send_message, &incoming_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&calld->slices, incoming_slice); @@ -312,69 +304,65 @@ static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx, // If all data has been read, invokes finish_send_message(). Otherwise, // an async call to grpc_byte_stream_next() has been started, which will // eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void continue_reading_send_message(grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; while (grpc_byte_stream_next( - exec_ctx, calld->send_message_batch->payload->send_message.send_message, - ~(size_t)0, &calld->on_send_message_next_done)) { - grpc_error* error = pull_slice_from_send_message(exec_ctx, calld); + calld->send_message_batch->payload->send_message.send_message, ~(size_t)0, + &calld->on_send_message_next_done)) { + grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + fail_send_message_batch_in_call_combiner(calld, error); GRPC_ERROR_UNREF(error); return; } if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { - finish_send_message(exec_ctx, elem); + finish_send_message(elem); break; } } } // Async callback for grpc_byte_stream_next(). -static void on_send_message_next_done(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void on_send_message_next_done(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + fail_send_message_batch_in_call_combiner(calld, error); return; } - error = pull_slice_from_send_message(exec_ctx, calld); + error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + fail_send_message_batch_in_call_combiner(calld, error); GRPC_ERROR_UNREF(error); return; } if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { - finish_send_message(exec_ctx, elem); + finish_send_message(elem); } else { - continue_reading_send_message(exec_ctx, elem); + continue_reading_send_message(elem); } } -static void start_send_message_batch(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* unused) { +static void start_send_message_batch(void* arg, grpc_error* unused) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (skip_compression( elem, calld->send_message_batch->payload->send_message.send_message->flags, calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { - send_message_batch_continue(exec_ctx, elem); + send_message_batch_continue(elem); } else { - continue_reading_send_message(exec_ctx, elem); + continue_reading_send_message(elem); } } static void compress_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* batch) { + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); // Handle cancel_stream. @@ -385,21 +373,19 @@ static void compress_start_transport_stream_op_batch( if (calld->send_message_batch != nullptr) { if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { GRPC_CALL_COMBINER_START( - exec_ctx, calld->call_combiner, + calld->call_combiner, GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld, grpc_schedule_on_exec_ctx), GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); } else { grpc_byte_stream_shutdown( - exec_ctx, calld->send_message_batch->payload->send_message.send_message, GRPC_ERROR_REF(calld->cancel_error)); } } } else if (calld->cancel_error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error), - calld->call_combiner); + batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); goto done; } // Handle send_initial_metadata. @@ -407,11 +393,10 @@ static void compress_start_transport_stream_op_batch( GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN); bool has_compression_algorithm; grpc_error* error = process_send_initial_metadata( - exec_ctx, elem, - batch->payload->send_initial_metadata.send_initial_metadata, + elem, batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, + grpc_transport_stream_op_batch_finish_with_failure(batch, error, calld->call_combiner); goto done; } @@ -425,7 +410,7 @@ static void compress_start_transport_stream_op_batch( // the call stack) will release the call combiner for each batch it sees. if (calld->send_message_batch != nullptr) { GRPC_CALL_COMBINER_START( - exec_ctx, calld->call_combiner, + calld->call_combiner, &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE, "starting send_message after send_initial_metadata"); } @@ -440,22 +425,21 @@ static void compress_start_transport_stream_op_batch( // send_initial_metadata. if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { GRPC_CALL_COMBINER_STOP( - exec_ctx, calld->call_combiner, + calld->call_combiner, "send_message batch pending send_initial_metadata"); goto done; } - start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE); + start_send_message_batch(elem, GRPC_ERROR_NONE); } else { // Pass control down the stack. - grpc_call_next_op(exec_ctx, elem, batch); + grpc_call_next_op(elem, batch); } done: GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; calld->call_combiner = args->call_combiner; @@ -471,17 +455,16 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); + grpc_slice_buffer_destroy_internal(&calld->slices); GRPC_ERROR_UNREF(calld->cancel_error); } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* channeld = (channel_data*)elem->channel_data; @@ -531,8 +514,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_channel_element* elem) {} const grpc_channel_filter grpc_message_compress_filter = { compress_start_transport_stream_op_batch, diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 4f3897915c..b872dc98f5 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -66,8 +66,7 @@ typedef struct channel_data { uint8_t unused; } channel_data; -static grpc_error* server_filter_outgoing_metadata(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* server_filter_outgoing_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { if (b->idx.named.grpc_message != nullptr) { grpc_slice pct_encoded_msg = grpc_percent_encode_slice( @@ -75,10 +74,9 @@ static grpc_error* server_filter_outgoing_metadata(grpc_exec_ctx* exec_ctx, grpc_compatible_percent_encoding_unreserved_bytes); if (grpc_slice_is_equivalent(pct_encoded_msg, GRPC_MDVALUE(b->idx.named.grpc_message->md))) { - grpc_slice_unref_internal(exec_ctx, pct_encoded_msg); + grpc_slice_unref_internal(pct_encoded_msg); } else { - grpc_metadata_batch_set_value(exec_ctx, b->idx.named.grpc_message, - pct_encoded_msg); + grpc_metadata_batch_set_value(b->idx.named.grpc_message, pct_encoded_msg); } } return GRPC_ERROR_NONE; @@ -93,8 +91,7 @@ static void add_error(const char* error_name, grpc_error** cumulative, *cumulative = grpc_error_add_child(*cumulative, new_err); } -static grpc_error* server_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { call_data* calld = (call_data*)elem->call_data; grpc_error* error = GRPC_ERROR_NONE; @@ -123,7 +120,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), b->idx.named.method->md)); } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.method); + grpc_metadata_batch_remove(b, b->idx.named.method); } else { add_error( error_name, &error, @@ -139,7 +136,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), b->idx.named.te->md)); } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.te); + grpc_metadata_batch_remove(b, b->idx.named.te); } else { add_error(error_name, &error, grpc_error_set_str( @@ -156,7 +153,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), b->idx.named.scheme->md)); } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.scheme); + grpc_metadata_batch_remove(b, b->idx.named.scheme); } else { add_error( error_name, &error, @@ -191,7 +188,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, gpr_free(val); } } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_type); + grpc_metadata_batch_remove(b, b->idx.named.content_type); } if (b->idx.named.path == nullptr) { @@ -218,22 +215,21 @@ static grpc_error* server_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, /* substitute path metadata with just the path (not query) */ grpc_mdelem mdelem_path_without_query = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_PATH, grpc_slice_sub(path_slice, 0, offset)); + GRPC_MDSTR_PATH, grpc_slice_sub(path_slice, 0, offset)); - grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + grpc_metadata_batch_substitute(b, b->idx.named.path, mdelem_path_without_query); /* decode payload from query and add to the slice buffer to be returned */ const int k_url_safe = 1; - grpc_slice_buffer_add( - &calld->read_slice_buffer, - grpc_base64_decode_with_len( - exec_ctx, (const char*)GRPC_SLICE_START_PTR(query_slice), - GRPC_SLICE_LENGTH(query_slice), k_url_safe)); + grpc_slice_buffer_add(&calld->read_slice_buffer, + grpc_base64_decode_with_len( + (const char*)GRPC_SLICE_START_PTR(query_slice), + GRPC_SLICE_LENGTH(query_slice), k_url_safe)); grpc_slice_buffer_stream_init(&calld->read_stream, &calld->read_slice_buffer, 0); calld->seen_path_with_query = true; - grpc_slice_unref_internal(exec_ctx, query_slice); + grpc_slice_unref_internal(query_slice); } else { gpr_log(GPR_ERROR, "GET request without QUERY"); } @@ -242,14 +238,14 @@ static grpc_error* server_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, if (b->idx.named.host != nullptr && b->idx.named.authority == nullptr) { grpc_linked_mdelem* el = b->idx.named.host; grpc_mdelem md = GRPC_MDELEM_REF(el->md); - grpc_metadata_batch_remove(exec_ctx, b, el); + grpc_metadata_batch_remove(b, el); add_error(error_name, &error, grpc_metadata_batch_add_head( - exec_ctx, b, el, + b, el, grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_AUTHORITY, + GRPC_MDSTR_AUTHORITY, grpc_slice_ref_internal(GRPC_MDVALUE(md))))); - GRPC_MDELEM_UNREF(exec_ctx, md); + GRPC_MDELEM_UNREF(md); } if (b->idx.named.authority == nullptr) { @@ -263,21 +259,18 @@ static grpc_error* server_filter_incoming_metadata(grpc_exec_ctx* exec_ctx, return error; } -static void hs_on_recv(grpc_exec_ctx* exec_ctx, void* user_data, - grpc_error* err) { +static void hs_on_recv(void* user_data, grpc_error* err) { grpc_call_element* elem = (grpc_call_element*)user_data; call_data* calld = (call_data*)elem->call_data; if (err == GRPC_ERROR_NONE) { - err = server_filter_incoming_metadata(exec_ctx, elem, - calld->recv_initial_metadata); + err = server_filter_incoming_metadata(elem, calld->recv_initial_metadata); } else { GRPC_ERROR_REF(err); } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv, err); + GRPC_CLOSURE_RUN(calld->on_done_recv, err); } -static void hs_on_complete(grpc_exec_ctx* exec_ctx, void* user_data, - grpc_error* err) { +static void hs_on_complete(void* user_data, grpc_error* err) { grpc_call_element* elem = (grpc_call_element*)user_data; call_data* calld = (call_data*)elem->call_data; /* Call recv_message_ready if we got the payload via the path field */ @@ -287,17 +280,16 @@ static void hs_on_complete(grpc_exec_ctx* exec_ctx, void* user_data, : (grpc_byte_stream*)&calld->read_stream; // Re-enter call combiner for recv_message_ready, since the surface // code will release the call combiner for each callback it receives. - GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, - calld->recv_message_ready, GRPC_ERROR_REF(err), + GRPC_CALL_COMBINER_START(calld->call_combiner, calld->recv_message_ready, + GRPC_ERROR_REF(err), "resuming recv_message_ready from on_complete"); calld->recv_message_ready = nullptr; calld->payload_bin_delivered = true; } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err)); + GRPC_CLOSURE_RUN(calld->on_complete, GRPC_ERROR_REF(err)); } -static void hs_recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, - grpc_error* err) { +static void hs_recv_message_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = (grpc_call_element*)user_data; call_data* calld = (call_data*)elem->call_data; if (calld->seen_path_with_query) { @@ -305,15 +297,14 @@ static void hs_recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, // returned in hs_on_complete callback. // Note that we release the call combiner here, so that other // callbacks can run. - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pausing recv_message_ready until on_complete"); } else { - GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); + GRPC_CLOSURE_RUN(calld->recv_message_ready, GRPC_ERROR_REF(err)); } } -static grpc_error* hs_mutate_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* hs_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { /* grab pointers to our data from the call element */ call_data* calld = (call_data*)elem->call_data; @@ -321,21 +312,19 @@ static grpc_error* hs_mutate_op(grpc_exec_ctx* exec_ctx, if (op->send_initial_metadata) { grpc_error* error = GRPC_ERROR_NONE; static const char* error_name = "Failed sending initial metadata"; + add_error(error_name, &error, + grpc_metadata_batch_add_head( + op->payload->send_initial_metadata.send_initial_metadata, + &calld->status, GRPC_MDELEM_STATUS_200)); + add_error(error_name, &error, + grpc_metadata_batch_add_tail( + op->payload->send_initial_metadata.send_initial_metadata, + &calld->content_type, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)); add_error( error_name, &error, - grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->status, GRPC_MDELEM_STATUS_200)); - add_error( - error_name, &error, - grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->content_type, - GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)); - add_error(error_name, &error, - server_filter_outgoing_metadata( - exec_ctx, elem, - op->payload->send_initial_metadata.send_initial_metadata)); + server_filter_outgoing_metadata( + elem, op->payload->send_initial_metadata.send_initial_metadata)); if (error != GRPC_ERROR_NONE) return error; } @@ -367,8 +356,7 @@ static grpc_error* hs_mutate_op(grpc_exec_ctx* exec_ctx, if (op->send_trailing_metadata) { grpc_error* error = server_filter_outgoing_metadata( - exec_ctx, elem, - op->payload->send_trailing_metadata.send_trailing_metadata); + elem, op->payload->send_trailing_metadata.send_trailing_metadata); if (error != GRPC_ERROR_NONE) return error; } @@ -376,23 +364,21 @@ static grpc_error* hs_mutate_op(grpc_exec_ctx* exec_ctx, } static void hs_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { + grpc_call_element* elem, grpc_transport_stream_op_batch* op) { call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0); - grpc_error* error = hs_mutate_op(exec_ctx, elem, op); + grpc_error* error = hs_mutate_op(elem, op); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error, + grpc_transport_stream_op_batch_finish_with_failure(op, error, calld->call_combiner); } else { - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(elem, op); } GPR_TIMER_END("hs_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { /* grab pointers to our data from the call element */ call_data* calld = (call_data*)elem->call_data; @@ -409,24 +395,22 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer); + grpc_slice_buffer_destroy_internal(&calld->read_slice_buffer); } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); return GRPC_ERROR_NONE; } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_channel_element* elem) {} const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_stream_op_batch, |