diff options
Diffstat (limited to 'src/core/ext/filters/http/message_compress/message_compress_filter.cc')
-rw-r--r-- | src/core/ext/filters/http/message_compress/message_compress_filter.cc | 145 |
1 files changed, 73 insertions, 72 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index f785e1355d..949ff917d6 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -45,7 +45,7 @@ typedef enum { } initial_metadata_state; typedef struct call_data { - grpc_call_combiner *call_combiner; + grpc_call_combiner* call_combiner; grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem stream_compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; @@ -54,12 +54,12 @@ typedef struct call_data { * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; initial_metadata_state send_initial_metadata_state; - grpc_error *cancel_error; + grpc_error* cancel_error; grpc_closure start_send_message_batch_in_call_combiner; - grpc_transport_stream_op_batch *send_message_batch; + grpc_transport_stream_op_batch* send_message_batch; grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_slice_buffer_stream replacement_stream; - grpc_closure *original_send_message_on_complete; + grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; grpc_closure on_send_message_next_done; } call_data; @@ -80,10 +80,10 @@ typedef struct channel_data { uint32_t supported_stream_compression_algorithms; } channel_data; -static bool skip_compression(grpc_call_element *elem, uint32_t flags, +static bool skip_compression(grpc_call_element* elem, uint32_t flags, bool has_compression_algorithm) { - call_data *calld = (call_data *)elem->call_data; - channel_data *channeld = (channel_data *)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; + channel_data* channeld = (channel_data*)elem->channel_data; if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { return true; @@ -99,15 +99,15 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags, } /** Filter initial metadata */ -static grpc_error *process_send_initial_metadata( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - bool *has_compression_algorithm) GRPC_MUST_USE_RESULT; -static grpc_error *process_send_initial_metadata( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, bool *has_compression_algorithm) { - call_data *calld = (call_data *)elem->call_data; - channel_data *channeld = (channel_data *)elem->channel_data; +static grpc_error* process_send_initial_metadata( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_metadata_batch* initial_metadata, + bool* has_compression_algorithm) GRPC_MUST_USE_RESULT; +static grpc_error* process_send_initial_metadata( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_metadata_batch* initial_metadata, bool* has_compression_algorithm) { + call_data* calld = (call_data*)elem->call_data; + channel_data* channeld = (channel_data*)elem->channel_data; *has_compression_algorithm = false; grpc_stream_compression_algorithm stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; @@ -117,7 +117,7 @@ static grpc_error *process_send_initial_metadata( initial_metadata->idx.named.grpc_internal_stream_encoding_request->md; if (!grpc_stream_compression_algorithm_parse( GRPC_MDVALUE(md), &stream_compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Invalid stream compression algorithm: '%s' (unknown). Ignoring.", val); @@ -126,7 +126,7 @@ static grpc_error *process_send_initial_metadata( } if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset, stream_compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log( GPR_ERROR, "Invalid stream compression algorithm: '%s' (previously disabled). " @@ -152,7 +152,7 @@ static grpc_error *process_send_initial_metadata( initial_metadata->idx.named.grpc_internal_encoding_request->md; if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), &calld->compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); gpr_free(val); @@ -177,7 +177,7 @@ static grpc_error *process_send_initial_metadata( *has_compression_algorithm = true; } - grpc_error *error = GRPC_ERROR_NONE; + grpc_error* error = GRPC_ERROR_NONE; /* hint compression algorithm */ if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( @@ -211,30 +211,30 @@ static grpc_error *process_send_initial_metadata( return error; } -static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; +static void send_message_on_complete(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; +static void send_message_batch_continue(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + call_data* calld = (call_data*)elem->call_data; // Note: The call to grpc_call_next_op() results in yielding the // call combiner, so we need to clear calld->send_message_batch // before we do that. - grpc_transport_stream_op_batch *send_message_batch = + grpc_transport_stream_op_batch* send_message_batch = calld->send_message_batch; calld->send_message_batch = NULL; grpc_call_next_op(exec_ctx, elem, send_message_batch); } -static void finish_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; +static void finish_send_message(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + call_data* calld = (call_data*)elem->call_data; // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); @@ -244,21 +244,22 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { - const char *algo_name; + const char* algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; const float savings_ratio = 1.0f - (float)after_size / (float)before_size; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); - gpr_log(GPR_DEBUG, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR - " bytes (%.2f%% savings)", + gpr_log(GPR_DEBUG, + "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR + " bytes (%.2f%% savings)", algo_name, before_size, after_size, 100 * savings_ratio); } grpc_slice_buffer_swap(&calld->slices, &tmp); send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (GRPC_TRACER_ON(grpc_compression_trace)) { - const char *algo_name; + const char* algo_name; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); gpr_log(GPR_DEBUG, @@ -282,10 +283,10 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, send_message_batch_continue(exec_ctx, elem); } -static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error) { - call_data *calld = (call_data *)arg; +static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, + void* arg, + grpc_error* error) { + call_data* calld = (call_data*)arg; if (calld->send_message_batch != NULL) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error), @@ -295,10 +296,10 @@ static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, } // Pulls a slice from the send_message byte stream and adds it to calld->slices. -static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, - call_data *calld) { +static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx, + call_data* calld) { grpc_slice incoming_slice; - grpc_error *error = grpc_byte_stream_pull( + grpc_error* error = grpc_byte_stream_pull( exec_ctx, calld->send_message_batch->payload->send_message.send_message, &incoming_slice); if (error == GRPC_ERROR_NONE) { @@ -311,13 +312,13 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, // If all data has been read, invokes finish_send_message(). Otherwise, // an async call to grpc_byte_stream_next() has been started, which will // eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; +static void continue_reading_send_message(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + call_data* calld = (call_data*)elem->call_data; while (grpc_byte_stream_next( exec_ctx, calld->send_message_batch->payload->send_message.send_message, ~(size_t)0, &calld->on_send_message_next_done)) { - grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); + grpc_error* error = pull_slice_from_send_message(exec_ctx, calld); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); @@ -333,10 +334,10 @@ static void continue_reading_send_message(grpc_exec_ctx *exec_ctx, } // Async callback for grpc_byte_stream_next(). -static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; +static void on_send_message_next_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); @@ -357,10 +358,10 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, } } -static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *unused) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; +static void start_send_message_batch(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* unused) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; if (skip_compression( elem, calld->send_message_batch->payload->send_message.send_message->flags, @@ -372,9 +373,9 @@ static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg, } static void compress_start_transport_stream_op_batch( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *batch) { - call_data *calld = (call_data *)elem->call_data; + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* batch) { + call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); // Handle cancel_stream. if (batch->cancel_stream) { @@ -405,7 +406,7 @@ static void compress_start_transport_stream_op_batch( if (batch->send_initial_metadata) { GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN); bool has_compression_algorithm; - grpc_error *error = process_send_initial_metadata( + grpc_error* error = process_send_initial_metadata( exec_ctx, elem, batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); @@ -453,10 +454,10 @@ done: } /* Constructor for call_data */ -static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_call_element_args *args) { - call_data *calld = (call_data *)elem->call_data; +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + const grpc_call_element_args* args) { + call_data* calld = (call_data*)elem->call_data; calld->call_combiner = args->call_combiner; calld->cancel_error = GRPC_ERROR_NONE; grpc_slice_buffer_init(&calld->slices); @@ -470,19 +471,19 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_final_info *final_info, - grpc_closure *ignored) { - call_data *calld = (call_data *)elem->call_data; +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); GRPC_ERROR_UNREF(calld->cancel_error); } /* Constructor for channel_data */ -static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { - channel_data *channeld = (channel_data *)elem->channel_data; +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + channel_data* channeld = (channel_data*)elem->channel_data; /* Configuration for message compression */ channeld->enabled_algorithms_bitset = @@ -530,8 +531,8 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) {} +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} const grpc_channel_filter grpc_message_compress_filter = { compress_start_transport_stream_op_batch, |