diff options
Diffstat (limited to 'src/core/ext/filters/http/message_compress')
-rw-r--r-- | src/core/ext/filters/http/message_compress/message_compress_filter.cc | 126 |
1 files changed, 54 insertions, 72 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index d070b56b6a..9ae13d2ed2 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -100,12 +100,11 @@ static bool skip_compression(grpc_call_element* elem, uint32_t flags, /** Filter initial metadata */ static grpc_error* process_send_initial_metadata( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_metadata_batch* initial_metadata, + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, bool* has_compression_algorithm) GRPC_MUST_USE_RESULT; static grpc_error* process_send_initial_metadata( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_metadata_batch* initial_metadata, bool* has_compression_algorithm) { + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + bool* has_compression_algorithm) { call_data* calld = (call_data*)elem->call_data; channel_data* channeld = (channel_data*)elem->channel_data; *has_compression_algorithm = false; @@ -137,13 +136,13 @@ static grpc_error* process_send_initial_metadata( } *has_compression_algorithm = true; grpc_metadata_batch_remove( - exec_ctx, initial_metadata, + initial_metadata, initial_metadata->idx.named.grpc_internal_stream_encoding_request); /* Disable message-wise compression */ calld->compression_algorithm = GRPC_COMPRESS_NONE; if (initial_metadata->idx.named.grpc_internal_encoding_request != nullptr) { grpc_metadata_batch_remove( - exec_ctx, initial_metadata, + initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); } } else if (initial_metadata->idx.named.grpc_internal_encoding_request != @@ -160,7 +159,7 @@ static grpc_error* process_send_initial_metadata( } *has_compression_algorithm = true; grpc_metadata_batch_remove( - exec_ctx, initial_metadata, + initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); } else { /* If no algorithm was found in the metadata and we aren't @@ -181,12 +180,11 @@ static grpc_error* process_send_initial_metadata( /* hint compression algorithm */ if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, - &calld->stream_compression_algorithm_storage, + initial_metadata, &calld->stream_compression_algorithm_storage, grpc_stream_compression_encoding_mdelem(stream_compression_algorithm)); } else if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, &calld->compression_algorithm_storage, + initial_metadata, &calld->compression_algorithm_storage, grpc_compression_encoding_mdelem(calld->compression_algorithm)); } @@ -194,7 +192,7 @@ static grpc_error* process_send_initial_metadata( /* convey supported compression algorithms */ error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, &calld->accept_encoding_storage, + initial_metadata, &calld->accept_encoding_storage, GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( channeld->supported_compression_algorithms)); @@ -203,7 +201,7 @@ static grpc_error* process_send_initial_metadata( /* Do not overwrite accept-encoding header if it already presents. */ if (!initial_metadata->idx.named.accept_encoding) { error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, &calld->accept_stream_encoding_storage, + initial_metadata, &calld->accept_stream_encoding_storage, GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS( channeld->supported_stream_compression_algorithms)); } @@ -211,17 +209,15 @@ static grpc_error* process_send_initial_metadata( return error; } -static void send_message_on_complete(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void send_message_on_complete(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + grpc_slice_buffer_reset_and_unref_internal(&calld->slices); + GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void send_message_batch_continue(grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; // Note: The call to grpc_call_next_op() results in yielding the // call combiner, so we need to clear calld->send_message_batch @@ -229,19 +225,18 @@ static void send_message_batch_continue(grpc_exec_ctx* exec_ctx, grpc_transport_stream_op_batch* send_message_batch = calld->send_message_batch; calld->send_message_batch = nullptr; - grpc_call_next_op(exec_ctx, elem, send_message_batch); + grpc_call_next_op(elem, send_message_batch); } -static void finish_send_message(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void finish_send_message(grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); uint32_t send_flags = calld->send_message_batch->payload->send_message.send_message->flags; - bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); + bool did_compress = + grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (grpc_compression_trace.enabled()) { const char* algo_name; @@ -268,11 +263,11 @@ static void finish_send_message(grpc_exec_ctx* exec_ctx, algo_name, calld->slices.length); } } - grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); + grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. grpc_byte_stream_destroy( - exec_ctx, calld->send_message_batch->payload->send_message.send_message); + calld->send_message_batch->payload->send_message.send_message); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, send_flags); calld->send_message_batch->payload->send_message.send_message = @@ -280,27 +275,24 @@ static void finish_send_message(grpc_exec_ctx* exec_ctx, calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; - send_message_batch_continue(exec_ctx, elem); + send_message_batch_continue(elem); } -static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, - void* arg, +static void fail_send_message_batch_in_call_combiner(void* arg, grpc_error* error) { call_data* calld = (call_data*)arg; if (calld->send_message_batch != nullptr) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error), - calld->call_combiner); + calld->send_message_batch, GRPC_ERROR_REF(error), calld->call_combiner); calld->send_message_batch = nullptr; } } // Pulls a slice from the send_message byte stream and adds it to calld->slices. -static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx, - call_data* calld) { +static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; grpc_error* error = grpc_byte_stream_pull( - exec_ctx, calld->send_message_batch->payload->send_message.send_message, + calld->send_message_batch->payload->send_message.send_message, &incoming_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&calld->slices, incoming_slice); @@ -312,69 +304,65 @@ static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx, // If all data has been read, invokes finish_send_message(). Otherwise, // an async call to grpc_byte_stream_next() has been started, which will // eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void continue_reading_send_message(grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; while (grpc_byte_stream_next( - exec_ctx, calld->send_message_batch->payload->send_message.send_message, - ~(size_t)0, &calld->on_send_message_next_done)) { - grpc_error* error = pull_slice_from_send_message(exec_ctx, calld); + calld->send_message_batch->payload->send_message.send_message, ~(size_t)0, + &calld->on_send_message_next_done)) { + grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + fail_send_message_batch_in_call_combiner(calld, error); GRPC_ERROR_UNREF(error); return; } if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { - finish_send_message(exec_ctx, elem); + finish_send_message(elem); break; } } } // Async callback for grpc_byte_stream_next(). -static void on_send_message_next_done(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void on_send_message_next_done(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + fail_send_message_batch_in_call_combiner(calld, error); return; } - error = pull_slice_from_send_message(exec_ctx, calld); + error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + fail_send_message_batch_in_call_combiner(calld, error); GRPC_ERROR_UNREF(error); return; } if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { - finish_send_message(exec_ctx, elem); + finish_send_message(elem); } else { - continue_reading_send_message(exec_ctx, elem); + continue_reading_send_message(elem); } } -static void start_send_message_batch(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* unused) { +static void start_send_message_batch(void* arg, grpc_error* unused) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (skip_compression( elem, calld->send_message_batch->payload->send_message.send_message->flags, calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { - send_message_batch_continue(exec_ctx, elem); + send_message_batch_continue(elem); } else { - continue_reading_send_message(exec_ctx, elem); + continue_reading_send_message(elem); } } static void compress_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* batch) { + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); // Handle cancel_stream. @@ -385,21 +373,19 @@ static void compress_start_transport_stream_op_batch( if (calld->send_message_batch != nullptr) { if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { GRPC_CALL_COMBINER_START( - exec_ctx, calld->call_combiner, + calld->call_combiner, GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld, grpc_schedule_on_exec_ctx), GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); } else { grpc_byte_stream_shutdown( - exec_ctx, calld->send_message_batch->payload->send_message.send_message, GRPC_ERROR_REF(calld->cancel_error)); } } } else if (calld->cancel_error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error), - calld->call_combiner); + batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); goto done; } // Handle send_initial_metadata. @@ -407,11 +393,10 @@ static void compress_start_transport_stream_op_batch( GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN); bool has_compression_algorithm; grpc_error* error = process_send_initial_metadata( - exec_ctx, elem, - batch->payload->send_initial_metadata.send_initial_metadata, + elem, batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, + grpc_transport_stream_op_batch_finish_with_failure(batch, error, calld->call_combiner); goto done; } @@ -425,7 +410,7 @@ static void compress_start_transport_stream_op_batch( // the call stack) will release the call combiner for each batch it sees. if (calld->send_message_batch != nullptr) { GRPC_CALL_COMBINER_START( - exec_ctx, calld->call_combiner, + calld->call_combiner, &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE, "starting send_message after send_initial_metadata"); } @@ -440,22 +425,21 @@ static void compress_start_transport_stream_op_batch( // send_initial_metadata. if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { GRPC_CALL_COMBINER_STOP( - exec_ctx, calld->call_combiner, + calld->call_combiner, "send_message batch pending send_initial_metadata"); goto done; } - start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE); + start_send_message_batch(elem, GRPC_ERROR_NONE); } else { // Pass control down the stack. - grpc_call_next_op(exec_ctx, elem, batch); + grpc_call_next_op(elem, batch); } done: GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; calld->call_combiner = args->call_combiner; @@ -471,17 +455,16 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); + grpc_slice_buffer_destroy_internal(&calld->slices); GRPC_ERROR_UNREF(calld->cancel_error); } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* channeld = (channel_data*)elem->channel_data; @@ -531,8 +514,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_channel_element* elem) {} const grpc_channel_filter grpc_message_compress_filter = { compress_start_transport_stream_op_batch, |