diff options
Diffstat (limited to 'src/core/ext/filters/http/message_compress/message_compress_filter.cc')
-rw-r--r-- | src/core/ext/filters/http/message_compress/message_compress_filter.cc | 45 |
1 files changed, 21 insertions, 24 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index efe0085c5b..f8f478b6c0 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -32,6 +32,7 @@ #include "src/core/lib/compression/compression_internal.h" #include "src/core/lib/compression/message_compress.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -62,7 +63,8 @@ struct call_data { grpc_closure start_send_message_batch_in_call_combiner; grpc_transport_stream_op_batch* send_message_batch; grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ - grpc_slice_buffer_stream replacement_stream; + grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> + replacement_stream; grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; grpc_closure on_send_message_next_done; @@ -220,7 +222,7 @@ static void finish_send_message(grpc_call_element* elem) { grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); uint32_t send_flags = - calld->send_message_batch->payload->send_message.send_message->flags; + calld->send_message_batch->payload->send_message.send_message->flags(); bool did_compress = grpc_msg_compress(calld->message_compression_algorithm, &calld->slices, &tmp); if (did_compress) { @@ -232,7 +234,7 @@ static void finish_send_message(grpc_call_element* elem) { static_cast<float>(before_size); GPR_ASSERT(grpc_message_compression_algorithm_name( calld->message_compression_algorithm, &algo_name)); - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR " bytes (%.2f%% savings)", algo_name, before_size, after_size, 100 * savings_ratio); @@ -244,7 +246,7 @@ static void finish_send_message(grpc_call_element* elem) { const char* algo_name; GPR_ASSERT(grpc_message_compression_algorithm_name( calld->message_compression_algorithm, &algo_name)); - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "Algorithm '%s' enabled but decided not to compress. Input size: " "%" PRIuPTR, algo_name, calld->slices.length); @@ -253,12 +255,9 @@ static void finish_send_message(grpc_call_element* elem) { grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. - grpc_byte_stream_destroy( - calld->send_message_batch->payload->send_message.send_message); - grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - send_flags); - calld->send_message_batch->payload->send_message.send_message = - &calld->replacement_stream.base; + calld->replacement_stream.Init(&calld->slices, send_flags); + calld->send_message_batch->payload->send_message.send_message.reset( + calld->replacement_stream.get()); calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; @@ -278,9 +277,9 @@ static void fail_send_message_batch_in_call_combiner(void* arg, // Pulls a slice from the send_message byte stream and adds it to calld->slices. static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; - grpc_error* error = grpc_byte_stream_pull( - calld->send_message_batch->payload->send_message.send_message, - &incoming_slice); + grpc_error* error = + calld->send_message_batch->payload->send_message.send_message->Pull( + &incoming_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&calld->slices, incoming_slice); } @@ -289,12 +288,11 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) { // Reads as many slices as possible from the send_message byte stream. // If all data has been read, invokes finish_send_message(). Otherwise, -// an async call to grpc_byte_stream_next() has been started, which will +// an async call to ByteStream::Next() has been started, which will // eventually result in calling on_send_message_next_done(). static void continue_reading_send_message(grpc_call_element* elem) { call_data* calld = static_cast<call_data*>(elem->call_data); - while (grpc_byte_stream_next( - calld->send_message_batch->payload->send_message.send_message, + while (calld->send_message_batch->payload->send_message.send_message->Next( ~static_cast<size_t>(0), &calld->on_send_message_next_done)) { grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { @@ -303,15 +301,15 @@ static void continue_reading_send_message(grpc_call_element* elem) { GRPC_ERROR_UNREF(error); return; } - if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length) { + if (calld->slices.length == calld->send_message_batch->payload->send_message + .send_message->length()) { finish_send_message(elem); break; } } } -// Async callback for grpc_byte_stream_next(). +// Async callback for ByteStream::Next(). static void on_send_message_next_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_data* calld = static_cast<call_data*>(elem->call_data); @@ -328,7 +326,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) { return; } if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length) { + calld->send_message_batch->payload->send_message.send_message->length()) { finish_send_message(elem); } else { continue_reading_send_message(elem); @@ -340,7 +338,8 @@ static void start_send_message_batch(void* arg, grpc_error* unused) { call_data* calld = static_cast<call_data*>(elem->call_data); if (skip_compression( elem, - calld->send_message_batch->payload->send_message.send_message->flags, + calld->send_message_batch->payload->send_message.send_message + ->flags(), calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { send_message_batch_continue(elem); } else { @@ -365,9 +364,7 @@ static void compress_start_transport_stream_op_batch( grpc_schedule_on_exec_ctx), GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); } else { - grpc_byte_stream_shutdown( - - calld->send_message_batch->payload->send_message.send_message, + calld->send_message_batch->payload->send_message.send_message->Shutdown( GRPC_ERROR_REF(calld->cancel_error)); } } |