diff options
Diffstat (limited to 'src/core/ext/filters/http/message_compress/message_compress_filter.cc')
-rw-r--r-- | src/core/ext/filters/http/message_compress/message_compress_filter.cc | 126 |
1 files changed, 72 insertions, 54 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index 9ae13d2ed2..d070b56b6a 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -100,11 +100,12 @@ static bool skip_compression(grpc_call_element* elem, uint32_t flags, /** Filter initial metadata */ static grpc_error* process_send_initial_metadata( - grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_metadata_batch* initial_metadata, bool* has_compression_algorithm) GRPC_MUST_USE_RESULT; static grpc_error* process_send_initial_metadata( - grpc_call_element* elem, grpc_metadata_batch* initial_metadata, - bool* has_compression_algorithm) { + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_metadata_batch* initial_metadata, bool* has_compression_algorithm) { call_data* calld = (call_data*)elem->call_data; channel_data* channeld = (channel_data*)elem->channel_data; *has_compression_algorithm = false; @@ -136,13 +137,13 @@ static grpc_error* process_send_initial_metadata( } *has_compression_algorithm = true; grpc_metadata_batch_remove( - initial_metadata, + exec_ctx, initial_metadata, initial_metadata->idx.named.grpc_internal_stream_encoding_request); /* Disable message-wise compression */ calld->compression_algorithm = GRPC_COMPRESS_NONE; if (initial_metadata->idx.named.grpc_internal_encoding_request != nullptr) { grpc_metadata_batch_remove( - initial_metadata, + exec_ctx, initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); } } else if (initial_metadata->idx.named.grpc_internal_encoding_request != @@ -159,7 +160,7 @@ static grpc_error* process_send_initial_metadata( } *has_compression_algorithm = true; grpc_metadata_batch_remove( - initial_metadata, + exec_ctx, initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); } else { /* If no algorithm was found in the metadata and we aren't @@ -180,11 +181,12 @@ static grpc_error* process_send_initial_metadata( /* hint compression algorithm */ if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->stream_compression_algorithm_storage, + exec_ctx, initial_metadata, + &calld->stream_compression_algorithm_storage, grpc_stream_compression_encoding_mdelem(stream_compression_algorithm)); } else if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->compression_algorithm_storage, + exec_ctx, initial_metadata, &calld->compression_algorithm_storage, grpc_compression_encoding_mdelem(calld->compression_algorithm)); } @@ -192,7 +194,7 @@ static grpc_error* process_send_initial_metadata( /* convey supported compression algorithms */ error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->accept_encoding_storage, + exec_ctx, initial_metadata, &calld->accept_encoding_storage, GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( channeld->supported_compression_algorithms)); @@ -201,7 +203,7 @@ static grpc_error* process_send_initial_metadata( /* Do not overwrite accept-encoding header if it already presents. */ if (!initial_metadata->idx.named.accept_encoding) { error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->accept_stream_encoding_storage, + exec_ctx, initial_metadata, &calld->accept_stream_encoding_storage, GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS( channeld->supported_stream_compression_algorithms)); } @@ -209,15 +211,17 @@ static grpc_error* process_send_initial_metadata( return error; } -static void send_message_on_complete(void* arg, grpc_error* error) { +static void send_message_on_complete(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; - grpc_slice_buffer_reset_and_unref_internal(&calld->slices); - GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_call_element* elem) { +static void send_message_batch_continue(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; // Note: The call to grpc_call_next_op() results in yielding the // call combiner, so we need to clear calld->send_message_batch @@ -225,18 +229,19 @@ static void send_message_batch_continue(grpc_call_element* elem) { grpc_transport_stream_op_batch* send_message_batch = calld->send_message_batch; calld->send_message_batch = nullptr; - grpc_call_next_op(elem, send_message_batch); + grpc_call_next_op(exec_ctx, elem, send_message_batch); } -static void finish_send_message(grpc_call_element* elem) { +static void finish_send_message(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); uint32_t send_flags = calld->send_message_batch->payload->send_message.send_message->flags; - bool did_compress = - grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); + bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, + &calld->slices, &tmp); if (did_compress) { if (grpc_compression_trace.enabled()) { const char* algo_name; @@ -263,11 +268,11 @@ static void finish_send_message(grpc_call_element* elem) { algo_name, calld->slices.length); } } - grpc_slice_buffer_destroy_internal(&tmp); + grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); // Swap out the original byte stream with our new one and send the // batch down. grpc_byte_stream_destroy( - calld->send_message_batch->payload->send_message.send_message); + exec_ctx, calld->send_message_batch->payload->send_message.send_message); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, send_flags); calld->send_message_batch->payload->send_message.send_message = @@ -275,24 +280,27 @@ static void finish_send_message(grpc_call_element* elem) { calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; - send_message_batch_continue(elem); + send_message_batch_continue(exec_ctx, elem); } -static void fail_send_message_batch_in_call_combiner(void* arg, +static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, + void* arg, grpc_error* error) { call_data* calld = (call_data*)arg; if (calld->send_message_batch != nullptr) { grpc_transport_stream_op_batch_finish_with_failure( - calld->send_message_batch, GRPC_ERROR_REF(error), calld->call_combiner); + exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error), + calld->call_combiner); calld->send_message_batch = nullptr; } } // Pulls a slice from the send_message byte stream and adds it to calld->slices. -static grpc_error* pull_slice_from_send_message(call_data* calld) { +static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx, + call_data* calld) { grpc_slice incoming_slice; grpc_error* error = grpc_byte_stream_pull( - calld->send_message_batch->payload->send_message.send_message, + exec_ctx, calld->send_message_batch->payload->send_message.send_message, &incoming_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&calld->slices, incoming_slice); @@ -304,65 +312,69 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) { // If all data has been read, invokes finish_send_message(). Otherwise, // an async call to grpc_byte_stream_next() has been started, which will // eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_call_element* elem) { +static void continue_reading_send_message(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { call_data* calld = (call_data*)elem->call_data; while (grpc_byte_stream_next( - calld->send_message_batch->payload->send_message.send_message, ~(size_t)0, - &calld->on_send_message_next_done)) { - grpc_error* error = pull_slice_from_send_message(calld); + exec_ctx, calld->send_message_batch->payload->send_message.send_message, + ~(size_t)0, &calld->on_send_message_next_done)) { + grpc_error* error = pull_slice_from_send_message(exec_ctx, calld); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); GRPC_ERROR_UNREF(error); return; } if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { - finish_send_message(elem); + finish_send_message(exec_ctx, elem); break; } } } // Async callback for grpc_byte_stream_next(). -static void on_send_message_next_done(void* arg, grpc_error* error) { +static void on_send_message_next_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); return; } - error = pull_slice_from_send_message(calld); + error = pull_slice_from_send_message(exec_ctx, calld); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); GRPC_ERROR_UNREF(error); return; } if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { - finish_send_message(elem); + finish_send_message(exec_ctx, elem); } else { - continue_reading_send_message(elem); + continue_reading_send_message(exec_ctx, elem); } } -static void start_send_message_batch(void* arg, grpc_error* unused) { +static void start_send_message_batch(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* unused) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (skip_compression( elem, calld->send_message_batch->payload->send_message.send_message->flags, calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { - send_message_batch_continue(elem); + send_message_batch_continue(exec_ctx, elem); } else { - continue_reading_send_message(elem); + continue_reading_send_message(exec_ctx, elem); } } static void compress_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); // Handle cancel_stream. @@ -373,19 +385,21 @@ static void compress_start_transport_stream_op_batch( if (calld->send_message_batch != nullptr) { if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { GRPC_CALL_COMBINER_START( - calld->call_combiner, + exec_ctx, calld->call_combiner, GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld, grpc_schedule_on_exec_ctx), GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); } else { grpc_byte_stream_shutdown( + exec_ctx, calld->send_message_batch->payload->send_message.send_message, GRPC_ERROR_REF(calld->cancel_error)); } } } else if (calld->cancel_error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); + exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error), + calld->call_combiner); goto done; } // Handle send_initial_metadata. @@ -393,10 +407,11 @@ static void compress_start_transport_stream_op_batch( GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN); bool has_compression_algorithm; grpc_error* error = process_send_initial_metadata( - elem, batch->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, elem, + batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(batch, error, + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, calld->call_combiner); goto done; } @@ -410,7 +425,7 @@ static void compress_start_transport_stream_op_batch( // the call stack) will release the call combiner for each batch it sees. if (calld->send_message_batch != nullptr) { GRPC_CALL_COMBINER_START( - calld->call_combiner, + exec_ctx, calld->call_combiner, &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE, "starting send_message after send_initial_metadata"); } @@ -425,21 +440,22 @@ static void compress_start_transport_stream_op_batch( // send_initial_metadata. if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { GRPC_CALL_COMBINER_STOP( - calld->call_combiner, + exec_ctx, calld->call_combiner, "send_message batch pending send_initial_metadata"); goto done; } - start_send_message_batch(elem, GRPC_ERROR_NONE); + start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE); } else { // Pass control down the stack. - grpc_call_next_op(elem, batch); + grpc_call_next_op(exec_ctx, elem, batch); } done: GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; calld->call_combiner = args->call_combiner; @@ -455,16 +471,17 @@ static grpc_error* init_call_elem(grpc_call_element* elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element* elem, +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; - grpc_slice_buffer_destroy_internal(&calld->slices); + grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); GRPC_ERROR_UNREF(calld->cancel_error); } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* channeld = (channel_data*)elem->channel_data; @@ -514,7 +531,8 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} const grpc_channel_filter grpc_message_compress_filter = { compress_start_transport_stream_op_batch, |