aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/http/message_compress/message_compress_filter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/http/message_compress/message_compress_filter.cc')
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.cc45
1 files changed, 21 insertions, 24 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
index efe0085c5b..f8f478b6c0 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
@@ -32,6 +32,7 @@
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -62,7 +63,8 @@ struct call_data {
grpc_closure start_send_message_batch_in_call_combiner;
grpc_transport_stream_op_batch* send_message_batch;
grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
- grpc_slice_buffer_stream replacement_stream;
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream>
+ replacement_stream;
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
grpc_closure on_send_message_next_done;
@@ -220,7 +222,7 @@ static void finish_send_message(grpc_call_element* elem) {
grpc_slice_buffer tmp;
grpc_slice_buffer_init(&tmp);
uint32_t send_flags =
- calld->send_message_batch->payload->send_message.send_message->flags;
+ calld->send_message_batch->payload->send_message.send_message->flags();
bool did_compress = grpc_msg_compress(calld->message_compression_algorithm,
&calld->slices, &tmp);
if (did_compress) {
@@ -232,7 +234,7 @@ static void finish_send_message(grpc_call_element* elem) {
static_cast<float>(before_size);
GPR_ASSERT(grpc_message_compression_algorithm_name(
calld->message_compression_algorithm, &algo_name));
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
" bytes (%.2f%% savings)",
algo_name, before_size, after_size, 100 * savings_ratio);
@@ -244,7 +246,7 @@ static void finish_send_message(grpc_call_element* elem) {
const char* algo_name;
GPR_ASSERT(grpc_message_compression_algorithm_name(
calld->message_compression_algorithm, &algo_name));
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"Algorithm '%s' enabled but decided not to compress. Input size: "
"%" PRIuPTR,
algo_name, calld->slices.length);
@@ -253,12 +255,9 @@ static void finish_send_message(grpc_call_element* elem) {
grpc_slice_buffer_destroy_internal(&tmp);
// Swap out the original byte stream with our new one and send the
// batch down.
- grpc_byte_stream_destroy(
- calld->send_message_batch->payload->send_message.send_message);
- grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- send_flags);
- calld->send_message_batch->payload->send_message.send_message =
- &calld->replacement_stream.base;
+ calld->replacement_stream.Init(&calld->slices, send_flags);
+ calld->send_message_batch->payload->send_message.send_message.reset(
+ calld->replacement_stream.get());
calld->original_send_message_on_complete =
calld->send_message_batch->on_complete;
calld->send_message_batch->on_complete = &calld->send_message_on_complete;
@@ -278,9 +277,9 @@ static void fail_send_message_batch_in_call_combiner(void* arg,
// Pulls a slice from the send_message byte stream and adds it to calld->slices.
static grpc_error* pull_slice_from_send_message(call_data* calld) {
grpc_slice incoming_slice;
- grpc_error* error = grpc_byte_stream_pull(
- calld->send_message_batch->payload->send_message.send_message,
- &incoming_slice);
+ grpc_error* error =
+ calld->send_message_batch->payload->send_message.send_message->Pull(
+ &incoming_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&calld->slices, incoming_slice);
}
@@ -289,12 +288,11 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) {
// Reads as many slices as possible from the send_message byte stream.
// If all data has been read, invokes finish_send_message(). Otherwise,
-// an async call to grpc_byte_stream_next() has been started, which will
+// an async call to ByteStream::Next() has been started, which will
// eventually result in calling on_send_message_next_done().
static void continue_reading_send_message(grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- while (grpc_byte_stream_next(
- calld->send_message_batch->payload->send_message.send_message,
+ while (calld->send_message_batch->payload->send_message.send_message->Next(
~static_cast<size_t>(0), &calld->on_send_message_next_done)) {
grpc_error* error = pull_slice_from_send_message(calld);
if (error != GRPC_ERROR_NONE) {
@@ -303,15 +301,15 @@ static void continue_reading_send_message(grpc_call_element* elem) {
GRPC_ERROR_UNREF(error);
return;
}
- if (calld->slices.length ==
- calld->send_message_batch->payload->send_message.send_message->length) {
+ if (calld->slices.length == calld->send_message_batch->payload->send_message
+ .send_message->length()) {
finish_send_message(elem);
break;
}
}
}
-// Async callback for grpc_byte_stream_next().
+// Async callback for ByteStream::Next().
static void on_send_message_next_done(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
@@ -328,7 +326,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) {
return;
}
if (calld->slices.length ==
- calld->send_message_batch->payload->send_message.send_message->length) {
+ calld->send_message_batch->payload->send_message.send_message->length()) {
finish_send_message(elem);
} else {
continue_reading_send_message(elem);
@@ -340,7 +338,8 @@ static void start_send_message_batch(void* arg, grpc_error* unused) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (skip_compression(
elem,
- calld->send_message_batch->payload->send_message.send_message->flags,
+ calld->send_message_batch->payload->send_message.send_message
+ ->flags(),
calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) {
send_message_batch_continue(elem);
} else {
@@ -365,9 +364,7 @@ static void compress_start_transport_stream_op_batch(
grpc_schedule_on_exec_ctx),
GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
} else {
- grpc_byte_stream_shutdown(
-
- calld->send_message_batch->payload->send_message.send_message,
+ calld->send_message_batch->payload->send_message.send_message->Shutdown(
GRPC_ERROR_REF(calld->cancel_error));
}
}