aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/gpr/log_linux.cc1
-rw-r--r--src/core/lib/security/transport/client_auth_filter.cc40
-rw-r--r--src/core/lib/transport/byte_stream.cc35
-rw-r--r--src/core/lib/transport/byte_stream.h3
4 files changed, 40 insertions, 39 deletions
diff --git a/src/core/lib/gpr/log_linux.cc b/src/core/lib/gpr/log_linux.cc
index d743eedf38..e4417d9d5d 100644
--- a/src/core/lib/gpr/log_linux.cc
+++ b/src/core/lib/gpr/log_linux.cc
@@ -32,6 +32,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include <inttypes.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
diff --git a/src/core/lib/security/transport/client_auth_filter.cc b/src/core/lib/security/transport/client_auth_filter.cc
index d6ca8ee8f8..048e390a71 100644
--- a/src/core/lib/security/transport/client_auth_filter.cc
+++ b/src/core/lib/security/transport/client_auth_filter.cc
@@ -45,8 +45,6 @@ struct call_data {
grpc_call_stack* owning_call;
grpc_call_combiner* call_combiner;
grpc_call_credentials* creds;
- bool have_host;
- bool have_method;
grpc_slice host;
grpc_slice method;
/* pollset{_set} bound to this call; if we need to make external
@@ -294,27 +292,15 @@ static void auth_start_transport_stream_op_batch(
}
if (batch->send_initial_metadata) {
- for (grpc_linked_mdelem* l = batch->payload->send_initial_metadata
- .send_initial_metadata->list.head;
- l != nullptr; l = l->next) {
- grpc_mdelem md = l->md;
- /* Pointer comparison is OK for md_elems created from the same context.
- */
- if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) {
- if (calld->have_host) {
- grpc_slice_unref_internal(calld->host);
- }
- calld->host = grpc_slice_ref_internal(GRPC_MDVALUE(md));
- calld->have_host = true;
- } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
- if (calld->have_method) {
- grpc_slice_unref_internal(calld->method);
- }
- calld->method = grpc_slice_ref_internal(GRPC_MDVALUE(md));
- calld->have_method = true;
- }
+ grpc_metadata_batch* metadata =
+ batch->payload->send_initial_metadata.send_initial_metadata;
+ if (metadata->idx.named.path != nullptr) {
+ calld->method =
+ grpc_slice_ref_internal(GRPC_MDVALUE(metadata->idx.named.path->md));
}
- if (calld->have_host) {
+ if (metadata->idx.named.authority != nullptr) {
+ calld->host = grpc_slice_ref_internal(
+ GRPC_MDVALUE(metadata->idx.named.authority->md));
batch->handler_private.extra_arg = elem;
GRPC_CALL_STACK_REF(calld->owning_call, "check_call_host");
GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch,
@@ -351,6 +337,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->owning_call = args->call_stack;
calld->call_combiner = args->call_combiner;
+ calld->host = grpc_empty_slice();
+ calld->method = grpc_empty_slice();
return GRPC_ERROR_NONE;
}
@@ -367,12 +355,8 @@ static void destroy_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
grpc_credentials_mdelem_array_destroy(&calld->md_array);
grpc_call_credentials_unref(calld->creds);
- if (calld->have_host) {
- grpc_slice_unref_internal(calld->host);
- }
- if (calld->have_method) {
- grpc_slice_unref_internal(calld->method);
- }
+ grpc_slice_unref_internal(calld->host);
+ grpc_slice_unref_internal(calld->method);
grpc_auth_metadata_context_reset(&calld->auth_md_context);
}
diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc
index 1aaf40fb99..cb15a71a91 100644
--- a/src/core/lib/transport/byte_stream.cc
+++ b/src/core/lib/transport/byte_stream.cc
@@ -79,17 +79,19 @@ void SliceBufferByteStream::Shutdown(grpc_error* error) {
//
ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)
- : underlying_stream_(std::move(underlying_stream)) {
+ : underlying_stream_(std::move(underlying_stream)),
+ length_(underlying_stream_->length()),
+ flags_(underlying_stream_->flags()) {
grpc_slice_buffer_init(&cache_buffer_);
}
-ByteStreamCache::~ByteStreamCache() {
- if (underlying_stream_ != nullptr) Destroy();
-}
+ByteStreamCache::~ByteStreamCache() { Destroy(); }
void ByteStreamCache::Destroy() {
underlying_stream_.reset();
- grpc_slice_buffer_destroy_internal(&cache_buffer_);
+ if (cache_buffer_.length > 0) {
+ grpc_slice_buffer_destroy_internal(&cache_buffer_);
+ }
}
//
@@ -97,9 +99,7 @@ void ByteStreamCache::Destroy() {
//
ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache)
- : ByteStream(cache->underlying_stream_->length(),
- cache->underlying_stream_->flags()),
- cache_(cache) {}
+ : ByteStream(cache->length_, cache->flags_), cache_(cache) {}
ByteStreamCache::CachingByteStream::~CachingByteStream() {}
@@ -115,6 +115,7 @@ bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint,
grpc_closure* on_complete) {
if (shutdown_error_ != GRPC_ERROR_NONE) return true;
if (cursor_ < cache_->cache_buffer_.count) return true;
+ GPR_ASSERT(cache_->underlying_stream_ != nullptr);
return cache_->underlying_stream_->Next(max_size_hint, on_complete);
}
@@ -125,13 +126,20 @@ grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
if (cursor_ < cache_->cache_buffer_.count) {
*slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]);
++cursor_;
+ offset_ += GRPC_SLICE_LENGTH(*slice);
return GRPC_ERROR_NONE;
}
+ GPR_ASSERT(cache_->underlying_stream_ != nullptr);
grpc_error* error = cache_->underlying_stream_->Pull(slice);
if (error == GRPC_ERROR_NONE) {
- ++cursor_;
grpc_slice_buffer_add(&cache_->cache_buffer_,
grpc_slice_ref_internal(*slice));
+ ++cursor_;
+ offset_ += GRPC_SLICE_LENGTH(*slice);
+ // Orphan the underlying stream if it's been drained.
+ if (offset_ == cache_->underlying_stream_->length()) {
+ cache_->underlying_stream_.reset();
+ }
}
return error;
}
@@ -139,9 +147,14 @@ grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) {
GRPC_ERROR_UNREF(shutdown_error_);
shutdown_error_ = GRPC_ERROR_REF(error);
- cache_->underlying_stream_->Shutdown(error);
+ if (cache_->underlying_stream_ != nullptr) {
+ cache_->underlying_stream_->Shutdown(error);
+ }
}
-void ByteStreamCache::CachingByteStream::Reset() { cursor_ = 0; }
+void ByteStreamCache::CachingByteStream::Reset() {
+ cursor_ = 0;
+ offset_ = 0;
+}
} // namespace grpc_core
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index f8243ac40d..95a756a5e4 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -139,6 +139,7 @@ class ByteStreamCache {
private:
ByteStreamCache* cache_;
size_t cursor_ = 0;
+ size_t offset_ = 0;
grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
};
@@ -153,6 +154,8 @@ class ByteStreamCache {
private:
OrphanablePtr<ByteStream> underlying_stream_;
+ uint32_t length_;
+ uint32_t flags_;
grpc_slice_buffer cache_buffer_;
};