aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/http/client/http_client_filter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/http/client/http_client_filter.cc')
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc48
1 files changed, 24 insertions, 24 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index 58aefd17c7..ae94ce47b9 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -20,9 +20,11 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <stdint.h>
#include <string.h>
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/b64.h"
#include "src/core/lib/slice/percent_encoding.h"
@@ -58,8 +60,9 @@ struct call_data {
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read;
- grpc_byte_stream_cache send_message_cache;
- grpc_caching_byte_stream send_message_caching_stream;
+ grpc_core::ManualConstructor<grpc_core::ByteStreamCache> send_message_cache;
+ grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
+ send_message_caching_stream;
grpc_closure on_send_message_next_done;
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
@@ -166,7 +169,7 @@ static void recv_trailing_metadata_on_complete(void* user_data,
static void send_message_on_complete(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
- grpc_byte_stream_cache_destroy(&calld->send_message_cache);
+ calld->send_message_cache.Destroy();
GRPC_CLOSURE_RUN(calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
}
@@ -175,8 +178,7 @@ static void send_message_on_complete(void* arg, grpc_error* error) {
// calld->send_message_bytes_read.
static grpc_error* pull_slice_from_send_message(call_data* calld) {
grpc_slice incoming_slice;
- grpc_error* error = grpc_byte_stream_pull(
- &calld->send_message_caching_stream.base, &incoming_slice);
+ grpc_error* error = calld->send_message_caching_stream->Pull(&incoming_slice);
if (error == GRPC_ERROR_NONE) {
calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice);
grpc_slice_unref_internal(incoming_slice);
@@ -186,24 +188,23 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) {
// Reads as many slices as possible from the send_message byte stream.
// Upon successful return, if calld->send_message_bytes_read ==
-// calld->send_message_caching_stream.base.length, then we have completed
+// calld->send_message_caching_stream->length(), then we have completed
// reading from the byte stream; otherwise, an async read has been dispatched
// and on_send_message_next_done() will be invoked when it is complete.
static grpc_error* read_all_available_send_message_data(call_data* calld) {
- while (grpc_byte_stream_next(&calld->send_message_caching_stream.base,
- ~static_cast<size_t>(0),
- &calld->on_send_message_next_done)) {
+ while (calld->send_message_caching_stream->Next(
+ SIZE_MAX, &calld->on_send_message_next_done)) {
grpc_error* error = pull_slice_from_send_message(calld);
if (error != GRPC_ERROR_NONE) return error;
if (calld->send_message_bytes_read ==
- calld->send_message_caching_stream.base.length) {
+ calld->send_message_caching_stream->length()) {
break;
}
}
return GRPC_ERROR_NONE;
}
-// Async callback for grpc_byte_stream_next().
+// Async callback for ByteStream::Next().
static void on_send_message_next_done(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
@@ -222,7 +223,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) {
// here, then we know that all of the data was not available
// synchronously, so we were not able to do a cached call. Instead,
// we just reset the byte stream and then send down the batch as-is.
- grpc_caching_byte_stream_reset(&calld->send_message_caching_stream);
+ calld->send_message_caching_stream->Reset();
grpc_call_next_op(elem, calld->send_message_batch);
}
@@ -253,7 +254,7 @@ static grpc_error* update_path_for_get(grpc_call_element* elem,
size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
estimated_len++; /* for the '?' */
estimated_len += grpc_base64_estimate_encoded_size(
- batch->payload->send_message.send_message->length, true /* url_safe */,
+ batch->payload->send_message.send_message->length(), true /* url_safe */,
false /* multi_line */);
grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
/* memcopy individual pieces into this slice */
@@ -265,9 +266,9 @@ static grpc_error* update_path_for_get(grpc_call_element* elem,
write_ptr += GRPC_SLICE_LENGTH(path_slice);
*write_ptr++ = '?';
char* payload_bytes =
- slice_buffer_to_string(&calld->send_message_cache.cache_buffer);
+ slice_buffer_to_string(calld->send_message_cache->cache_buffer());
grpc_base64_encode_core(write_ptr, payload_bytes,
- batch->payload->send_message.send_message->length,
+ batch->payload->send_message.send_message->length(),
true /* url_safe */, false /* multi_line */);
gpr_free(payload_bytes);
/* remove trailing unused memory and add trailing 0 to terminate string */
@@ -326,15 +327,14 @@ static void hc_start_transport_stream_op_batch(
if (batch->send_message &&
(batch->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
- batch->payload->send_message.send_message->length <
+ batch->payload->send_message.send_message->length() <
channeld->max_payload_size_for_get) {
calld->send_message_bytes_read = 0;
- grpc_byte_stream_cache_init(&calld->send_message_cache,
- batch->payload->send_message.send_message);
- grpc_caching_byte_stream_init(&calld->send_message_caching_stream,
- &calld->send_message_cache);
- batch->payload->send_message.send_message =
- &calld->send_message_caching_stream.base;
+ calld->send_message_cache.Init(
+ std::move(batch->payload->send_message.send_message));
+ calld->send_message_caching_stream.Init(calld->send_message_cache.get());
+ batch->payload->send_message.send_message.reset(
+ calld->send_message_caching_stream.get());
calld->original_send_message_on_complete = batch->on_complete;
batch->on_complete = &calld->send_message_on_complete;
calld->send_message_batch = batch;
@@ -342,12 +342,12 @@ static void hc_start_transport_stream_op_batch(
if (error != GRPC_ERROR_NONE) goto done;
// If all the data has been read, then we can use GET.
if (calld->send_message_bytes_read ==
- calld->send_message_caching_stream.base.length) {
+ calld->send_message_caching_stream->length()) {
method = GRPC_MDELEM_METHOD_GET;
error = update_path_for_get(elem, batch);
if (error != GRPC_ERROR_NONE) goto done;
batch->send_message = false;
- grpc_byte_stream_destroy(&calld->send_message_caching_stream.base);
+ calld->send_message_caching_stream->Orphan();
} else {
// Not all data is available. The batch will be sent down
// asynchronously in on_send_message_next_done().