aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/http/message_compress/message_compress_filter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/http/message_compress/message_compress_filter.cc')
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.cc145
1 files changed, 73 insertions, 72 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
index f785e1355d..949ff917d6 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
@@ -45,7 +45,7 @@ typedef enum {
} initial_metadata_state;
typedef struct call_data {
- grpc_call_combiner *call_combiner;
+ grpc_call_combiner* call_combiner;
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem stream_compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
@@ -54,12 +54,12 @@ typedef struct call_data {
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
initial_metadata_state send_initial_metadata_state;
- grpc_error *cancel_error;
+ grpc_error* cancel_error;
grpc_closure start_send_message_batch_in_call_combiner;
- grpc_transport_stream_op_batch *send_message_batch;
+ grpc_transport_stream_op_batch* send_message_batch;
grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_slice_buffer_stream replacement_stream;
- grpc_closure *original_send_message_on_complete;
+ grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
grpc_closure on_send_message_next_done;
} call_data;
@@ -80,10 +80,10 @@ typedef struct channel_data {
uint32_t supported_stream_compression_algorithms;
} channel_data;
-static bool skip_compression(grpc_call_element *elem, uint32_t flags,
+static bool skip_compression(grpc_call_element* elem, uint32_t flags,
bool has_compression_algorithm) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *channeld = (channel_data *)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* channeld = (channel_data*)elem->channel_data;
if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
return true;
@@ -99,15 +99,15 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags,
}
/** Filter initial metadata */
-static grpc_error *process_send_initial_metadata(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_metadata_batch *initial_metadata,
- bool *has_compression_algorithm) GRPC_MUST_USE_RESULT;
-static grpc_error *process_send_initial_metadata(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_metadata_batch *initial_metadata, bool *has_compression_algorithm) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *channeld = (channel_data *)elem->channel_data;
+static grpc_error* process_send_initial_metadata(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_metadata_batch* initial_metadata,
+ bool* has_compression_algorithm) GRPC_MUST_USE_RESULT;
+static grpc_error* process_send_initial_metadata(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_metadata_batch* initial_metadata, bool* has_compression_algorithm) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* channeld = (channel_data*)elem->channel_data;
*has_compression_algorithm = false;
grpc_stream_compression_algorithm stream_compression_algorithm =
GRPC_STREAM_COMPRESS_NONE;
@@ -117,7 +117,7 @@ static grpc_error *process_send_initial_metadata(
initial_metadata->idx.named.grpc_internal_stream_encoding_request->md;
if (!grpc_stream_compression_algorithm_parse(
GRPC_MDVALUE(md), &stream_compression_algorithm)) {
- char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
"Invalid stream compression algorithm: '%s' (unknown). Ignoring.",
val);
@@ -126,7 +126,7 @@ static grpc_error *process_send_initial_metadata(
}
if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset,
stream_compression_algorithm)) {
- char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(
GPR_ERROR,
"Invalid stream compression algorithm: '%s' (previously disabled). "
@@ -152,7 +152,7 @@ static grpc_error *process_send_initial_metadata(
initial_metadata->idx.named.grpc_internal_encoding_request->md;
if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
&calld->compression_algorithm)) {
- char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
"Invalid compression algorithm: '%s' (unknown). Ignoring.", val);
gpr_free(val);
@@ -177,7 +177,7 @@ static grpc_error *process_send_initial_metadata(
*has_compression_algorithm = true;
}
- grpc_error *error = GRPC_ERROR_NONE;
+ grpc_error* error = GRPC_ERROR_NONE;
/* hint compression algorithm */
if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) {
error = grpc_metadata_batch_add_tail(
@@ -211,30 +211,30 @@ static grpc_error *process_send_initial_metadata(
return error;
}
-static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = (call_data *)elem->call_data;
+static void send_message_on_complete(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ call_data* calld = (call_data*)elem->call_data;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
}
-static void send_message_batch_continue(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = (call_data *)elem->call_data;
+static void send_message_batch_continue(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem) {
+ call_data* calld = (call_data*)elem->call_data;
// Note: The call to grpc_call_next_op() results in yielding the
// call combiner, so we need to clear calld->send_message_batch
// before we do that.
- grpc_transport_stream_op_batch *send_message_batch =
+ grpc_transport_stream_op_batch* send_message_batch =
calld->send_message_batch;
calld->send_message_batch = NULL;
grpc_call_next_op(exec_ctx, elem, send_message_batch);
}
-static void finish_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = (call_data *)elem->call_data;
+static void finish_send_message(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem) {
+ call_data* calld = (call_data*)elem->call_data;
// Compress the data if appropriate.
grpc_slice_buffer tmp;
grpc_slice_buffer_init(&tmp);
@@ -244,21 +244,22 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
&calld->slices, &tmp);
if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
- const char *algo_name;
+ const char* algo_name;
const size_t before_size = calld->slices.length;
const size_t after_size = tmp.length;
const float savings_ratio = 1.0f - (float)after_size / (float)before_size;
GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
&algo_name));
- gpr_log(GPR_DEBUG, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
- " bytes (%.2f%% savings)",
+ gpr_log(GPR_DEBUG,
+ "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
+ " bytes (%.2f%% savings)",
algo_name, before_size, after_size, 100 * savings_ratio);
}
grpc_slice_buffer_swap(&calld->slices, &tmp);
send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
- const char *algo_name;
+ const char* algo_name;
GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
&algo_name));
gpr_log(GPR_DEBUG,
@@ -282,10 +283,10 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
send_message_batch_continue(exec_ctx, elem);
}
-static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error) {
- call_data *calld = (call_data *)arg;
+static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx* exec_ctx,
+ void* arg,
+ grpc_error* error) {
+ call_data* calld = (call_data*)arg;
if (calld->send_message_batch != NULL) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error),
@@ -295,10 +296,10 @@ static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
}
// Pulls a slice from the send_message byte stream and adds it to calld->slices.
-static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
- call_data *calld) {
+static grpc_error* pull_slice_from_send_message(grpc_exec_ctx* exec_ctx,
+ call_data* calld) {
grpc_slice incoming_slice;
- grpc_error *error = grpc_byte_stream_pull(
+ grpc_error* error = grpc_byte_stream_pull(
exec_ctx, calld->send_message_batch->payload->send_message.send_message,
&incoming_slice);
if (error == GRPC_ERROR_NONE) {
@@ -311,13 +312,13 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
// If all data has been read, invokes finish_send_message(). Otherwise,
// an async call to grpc_byte_stream_next() has been started, which will
// eventually result in calling on_send_message_next_done().
-static void continue_reading_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = (call_data *)elem->call_data;
+static void continue_reading_send_message(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem) {
+ call_data* calld = (call_data*)elem->call_data;
while (grpc_byte_stream_next(
exec_ctx, calld->send_message_batch->payload->send_message.send_message,
~(size_t)0, &calld->on_send_message_next_done)) {
- grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
+ grpc_error* error = pull_slice_from_send_message(exec_ctx, calld);
if (error != GRPC_ERROR_NONE) {
// Closure callback; does not take ownership of error.
fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
@@ -333,10 +334,10 @@ static void continue_reading_send_message(grpc_exec_ctx *exec_ctx,
}
// Async callback for grpc_byte_stream_next().
-static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = (call_data *)elem->call_data;
+static void on_send_message_next_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ call_data* calld = (call_data*)elem->call_data;
if (error != GRPC_ERROR_NONE) {
// Closure callback; does not take ownership of error.
fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
@@ -357,10 +358,10 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
}
}
-static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *unused) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = (call_data *)elem->call_data;
+static void start_send_message_batch(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* unused) {
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ call_data* calld = (call_data*)elem->call_data;
if (skip_compression(
elem,
calld->send_message_batch->payload->send_message.send_message->flags,
@@ -372,9 +373,9 @@ static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg,
}
static void compress_start_transport_stream_op_batch(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *batch) {
- call_data *calld = (call_data *)elem->call_data;
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* batch) {
+ call_data* calld = (call_data*)elem->call_data;
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
// Handle cancel_stream.
if (batch->cancel_stream) {
@@ -405,7 +406,7 @@ static void compress_start_transport_stream_op_batch(
if (batch->send_initial_metadata) {
GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN);
bool has_compression_algorithm;
- grpc_error *error = process_send_initial_metadata(
+ grpc_error* error = process_send_initial_metadata(
exec_ctx, elem,
batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm);
@@ -453,10 +454,10 @@ done:
}
/* Constructor for call_data */
-static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- const grpc_call_element_args *args) {
- call_data *calld = (call_data *)elem->call_data;
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ call_data* calld = (call_data*)elem->call_data;
calld->call_combiner = args->call_combiner;
calld->cancel_error = GRPC_ERROR_NONE;
grpc_slice_buffer_init(&calld->slices);
@@ -470,19 +471,19 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_final_info *final_info,
- grpc_closure *ignored) {
- call_data *calld = (call_data *)elem->call_data;
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* ignored) {
+ call_data* calld = (call_data*)elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
GRPC_ERROR_UNREF(calld->cancel_error);
}
/* Constructor for channel_data */
-static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- channel_data *channeld = (channel_data *)elem->channel_data;
+static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ channel_data* channeld = (channel_data*)elem->channel_data;
/* Configuration for message compression */
channeld->enabled_algorithms_bitset =
@@ -530,8 +531,8 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
}
/* Destructor for channel data */
-static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {}
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {}
const grpc_channel_filter grpc_message_compress_filter = {
compress_start_transport_stream_op_batch,