diff options
Diffstat (limited to 'src/core/ext/filters/http/message_compress/message_compress_filter.cc')
-rw-r--r-- | src/core/ext/filters/http/message_compress/message_compress_filter.cc | 119 |
1 files changed, 60 insertions, 59 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index bacadb04b3..6c284e1c3d 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -45,7 +45,7 @@ typedef enum { } initial_metadata_state; typedef struct call_data { - grpc_call_combiner *call_combiner; + grpc_call_combiner* call_combiner; grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem stream_compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; @@ -54,12 +54,12 @@ typedef struct call_data { * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; initial_metadata_state send_initial_metadata_state; - grpc_error *cancel_error; + grpc_error* cancel_error; grpc_closure start_send_message_batch_in_call_combiner; - grpc_transport_stream_op_batch *send_message_batch; + grpc_transport_stream_op_batch* send_message_batch; grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_slice_buffer_stream replacement_stream; - grpc_closure *original_send_message_on_complete; + grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; grpc_closure on_send_message_next_done; } call_data; @@ -80,10 +80,10 @@ typedef struct channel_data { uint32_t supported_stream_compression_algorithms; } channel_data; -static bool skip_compression(grpc_call_element *elem, uint32_t flags, +static bool skip_compression(grpc_call_element* elem, uint32_t flags, bool has_compression_algorithm) { - call_data *calld = (call_data *)elem->call_data; - channel_data *channeld = (channel_data *)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; + channel_data* channeld = (channel_data*)elem->channel_data; if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { return true; @@ -99,14 +99,14 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags, } /** Filter initial metadata */ -static grpc_error *process_send_initial_metadata( - grpc_call_element *elem, grpc_metadata_batch *initial_metadata, - bool *has_compression_algorithm) GRPC_MUST_USE_RESULT; -static grpc_error *process_send_initial_metadata( - grpc_call_element *elem, grpc_metadata_batch *initial_metadata, - bool *has_compression_algorithm) { - call_data *calld = (call_data *)elem->call_data; - channel_data *channeld = (channel_data *)elem->channel_data; +static grpc_error* process_send_initial_metadata( + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + bool* has_compression_algorithm) GRPC_MUST_USE_RESULT; +static grpc_error* process_send_initial_metadata( + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + bool* has_compression_algorithm) { + call_data* calld = (call_data*)elem->call_data; + channel_data* channeld = (channel_data*)elem->channel_data; *has_compression_algorithm = false; grpc_stream_compression_algorithm stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; @@ -116,7 +116,7 @@ static grpc_error *process_send_initial_metadata( initial_metadata->idx.named.grpc_internal_stream_encoding_request->md; if (!grpc_stream_compression_algorithm_parse( GRPC_MDVALUE(md), &stream_compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Invalid stream compression algorithm: '%s' (unknown). Ignoring.", val); @@ -125,7 +125,7 @@ static grpc_error *process_send_initial_metadata( } if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset, stream_compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log( GPR_ERROR, "Invalid stream compression algorithm: '%s' (previously disabled). " @@ -151,7 +151,7 @@ static grpc_error *process_send_initial_metadata( initial_metadata->idx.named.grpc_internal_encoding_request->md; if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), &calld->compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); gpr_free(val); @@ -176,7 +176,7 @@ static grpc_error *process_send_initial_metadata( *has_compression_algorithm = true; } - grpc_error *error = GRPC_ERROR_NONE; + grpc_error* error = GRPC_ERROR_NONE; /* hint compression algorithm */ if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( @@ -209,27 +209,27 @@ static grpc_error *process_send_initial_metadata( return error; } -static void send_message_on_complete(void *arg, grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; +static void send_message_on_complete(void* arg, grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; grpc_slice_buffer_reset_and_unref_internal(&calld->slices); GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; +static void send_message_batch_continue(grpc_call_element* elem) { + call_data* calld = (call_data*)elem->call_data; // Note: The call to grpc_call_next_op() results in yielding the // call combiner, so we need to clear calld->send_message_batch // before we do that. - grpc_transport_stream_op_batch *send_message_batch = + grpc_transport_stream_op_batch* send_message_batch = calld->send_message_batch; calld->send_message_batch = NULL; grpc_call_next_op(elem, send_message_batch); } -static void finish_send_message(grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; +static void finish_send_message(grpc_call_element* elem) { + call_data* calld = (call_data*)elem->call_data; // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); @@ -239,21 +239,22 @@ static void finish_send_message(grpc_call_element *elem) { grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { - const char *algo_name; + const char* algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; const float savings_ratio = 1.0f - (float)after_size / (float)before_size; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); - gpr_log(GPR_DEBUG, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR - " bytes (%.2f%% savings)", + gpr_log(GPR_DEBUG, + "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR + " bytes (%.2f%% savings)", algo_name, before_size, after_size, 100 * savings_ratio); } grpc_slice_buffer_swap(&calld->slices, &tmp); send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (GRPC_TRACER_ON(grpc_compression_trace)) { - const char *algo_name; + const char* algo_name; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); gpr_log(GPR_DEBUG, @@ -277,9 +278,9 @@ static void finish_send_message(grpc_call_element *elem) { send_message_batch_continue(elem); } -static void fail_send_message_batch_in_call_combiner(void *arg, - grpc_error *error) { - call_data *calld = (call_data *)arg; +static void fail_send_message_batch_in_call_combiner(void* arg, + grpc_error* error) { + call_data* calld = (call_data*)arg; if (calld->send_message_batch != NULL) { grpc_transport_stream_op_batch_finish_with_failure( calld->send_message_batch, GRPC_ERROR_REF(error), calld->call_combiner); @@ -288,9 +289,9 @@ static void fail_send_message_batch_in_call_combiner(void *arg, } // Pulls a slice from the send_message byte stream and adds it to calld->slices. -static grpc_error *pull_slice_from_send_message(call_data *calld) { +static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; - grpc_error *error = grpc_byte_stream_pull( + grpc_error* error = grpc_byte_stream_pull( calld->send_message_batch->payload->send_message.send_message, &incoming_slice); if (error == GRPC_ERROR_NONE) { @@ -303,12 +304,12 @@ static grpc_error *pull_slice_from_send_message(call_data *calld) { // If all data has been read, invokes finish_send_message(). Otherwise, // an async call to grpc_byte_stream_next() has been started, which will // eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; +static void continue_reading_send_message(grpc_call_element* elem) { + call_data* calld = (call_data*)elem->call_data; while (grpc_byte_stream_next( calld->send_message_batch->payload->send_message.send_message, ~(size_t)0, &calld->on_send_message_next_done)) { - grpc_error *error = pull_slice_from_send_message(calld); + grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. fail_send_message_batch_in_call_combiner(calld, error); @@ -324,9 +325,9 @@ static void continue_reading_send_message(grpc_call_element *elem) { } // Async callback for grpc_byte_stream_next(). -static void on_send_message_next_done(void *arg, grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; +static void on_send_message_next_done(void* arg, grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. fail_send_message_batch_in_call_combiner(calld, error); @@ -347,9 +348,9 @@ static void on_send_message_next_done(void *arg, grpc_error *error) { } } -static void start_send_message_batch(void *arg, grpc_error *unused) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; +static void start_send_message_batch(void* arg, grpc_error* unused) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; if (skip_compression( elem, calld->send_message_batch->payload->send_message.send_message->flags, @@ -361,8 +362,8 @@ static void start_send_message_batch(void *arg, grpc_error *unused) { } static void compress_start_transport_stream_op_batch( - grpc_call_element *elem, grpc_transport_stream_op_batch *batch) { - call_data *calld = (call_data *)elem->call_data; + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); // Handle cancel_stream. if (batch->cancel_stream) { @@ -391,7 +392,7 @@ static void compress_start_transport_stream_op_batch( if (batch->send_initial_metadata) { GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN); bool has_compression_algorithm; - grpc_error *error = process_send_initial_metadata( + grpc_error* error = process_send_initial_metadata( elem, batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { @@ -438,9 +439,9 @@ done: } /* Constructor for call_data */ -static grpc_error *init_call_elem(grpc_call_element *elem, - const grpc_call_element_args *args) { - call_data *calld = (call_data *)elem->call_data; +static grpc_error* init_call_elem(grpc_call_element* elem, + const grpc_call_element_args* args) { + call_data* calld = (call_data*)elem->call_data; calld->call_combiner = args->call_combiner; calld->cancel_error = GRPC_ERROR_NONE; grpc_slice_buffer_init(&calld->slices); @@ -454,18 +455,18 @@ static grpc_error *init_call_elem(grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem, - const grpc_call_final_info *final_info, - grpc_closure *ignored) { - call_data *calld = (call_data *)elem->call_data; +static void destroy_call_elem(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; grpc_slice_buffer_destroy_internal(&calld->slices); GRPC_ERROR_UNREF(calld->cancel_error); } /* Constructor for channel_data */ -static grpc_error *init_channel_elem(grpc_channel_element *elem, - grpc_channel_element_args *args) { - channel_data *channeld = (channel_data *)elem->channel_data; +static grpc_error* init_channel_elem(grpc_channel_element* elem, + grpc_channel_element_args* args) { + channel_data* channeld = (channel_data*)elem->channel_data; /* Configuration for message compression */ channeld->enabled_algorithms_bitset = @@ -513,7 +514,7 @@ static grpc_error *init_channel_elem(grpc_channel_element *elem, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem) {} +static void destroy_channel_elem(grpc_channel_element* elem) {} const grpc_channel_filter grpc_message_compress_filter = { compress_start_transport_stream_op_batch, |