aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2018-05-11 12:20:11 -0700
committerGravatar kpayson64 <kpayson@google.com>2018-05-11 12:20:11 -0700
commit4fad281ce8affe27fb7428f264d2c3b9dfc45f2f (patch)
treeca96c9efd69afec56aa2e5fe072a9f758247d0a3 /src/core/lib/transport
parentec445cc2bb270ed4acb1c710c3533fca14a50019 (diff)
parent61fdb46ac456027c79841949272ec540f66d2317 (diff)
Merge remote-tracking branch 'upstream/master' into fork_exec_ctx_check
Diffstat (limited to 'src/core/lib/transport')
-rw-r--r--src/core/lib/transport/bdp_estimator.cc6
-rw-r--r--src/core/lib/transport/bdp_estimator.h4
-rw-r--r--src/core/lib/transport/byte_stream.cc208
-rw-r--r--src/core/lib/transport/byte_stream.h187
-rw-r--r--src/core/lib/transport/connectivity_state.cc13
-rw-r--r--src/core/lib/transport/connectivity_state.h2
-rw-r--r--src/core/lib/transport/metadata.h2
-rw-r--r--src/core/lib/transport/metadata_batch.h1
-rw-r--r--src/core/lib/transport/status_conversion.h1
-rw-r--r--src/core/lib/transport/transport.cc2
-rw-r--r--src/core/lib/transport/transport.h11
-rw-r--r--src/core/lib/transport/transport_op_string.cc13
12 files changed, 231 insertions, 219 deletions
diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc
index 8130535ddd..8e71f86989 100644
--- a/src/core/lib/transport/bdp_estimator.cc
+++ b/src/core/lib/transport/bdp_estimator.cc
@@ -47,7 +47,7 @@ grpc_millis BdpEstimator::CompletePing() {
double bw = dt > 0 ? (static_cast<double>(accumulator_) / dt) : 0;
int start_inter_ping_delay = inter_ping_delay_;
if (grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"bdp[%s]:complete acc=%" PRId64 " est=%" PRId64
" dt=%lf bw=%lfMbs bw_est=%lfMbs",
name_, accumulator_, estimate_, dt, bw / 125000.0,
@@ -58,7 +58,7 @@ grpc_millis BdpEstimator::CompletePing() {
estimate_ = GPR_MAX(accumulator_, estimate_ * 2);
bw_est_ = bw;
if (grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, name_,
+ gpr_log(GPR_INFO, "bdp[%s]: estimate increased to %" PRId64, name_,
estimate_);
}
inter_ping_delay_ /= 2; // if the ping estimate changes,
@@ -75,7 +75,7 @@ grpc_millis BdpEstimator::CompletePing() {
if (start_inter_ping_delay != inter_ping_delay_) {
stable_estimate_count_ = 0;
if (grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", name_,
+ gpr_log(GPR_INFO, "bdp[%s]:update_inter_time to %dms", name_,
inter_ping_delay_);
}
}
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index e703af121c..ab13ae4be4 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -50,7 +50,7 @@ class BdpEstimator {
// transport (but not necessarily started)
void SchedulePing() {
if (grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, name_,
+ gpr_log(GPR_INFO, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, name_,
accumulator_, estimate_);
}
GPR_ASSERT(ping_state_ == PingState::UNSCHEDULED);
@@ -63,7 +63,7 @@ class BdpEstimator {
// the ping is on the wire
void StartPing() {
if (grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, name_,
+ gpr_log(GPR_INFO, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, name_,
accumulator_, estimate_);
}
GPR_ASSERT(ping_state_ == PingState::SCHEDULED);
diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc
index e1751f8010..cb15a71a91 100644
--- a/src/core/lib/transport/byte_stream.cc
+++ b/src/core/lib/transport/byte_stream.cc
@@ -25,160 +25,136 @@
#include <grpc/support/log.h>
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/slice/slice_internal.h"
-bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint,
- grpc_closure* on_complete) {
- return byte_stream->vtable->next(byte_stream, max_size_hint, on_complete);
-}
+namespace grpc_core {
-grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice) {
- return byte_stream->vtable->pull(byte_stream, slice);
-}
+//
+// SliceBufferByteStream
+//
-void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error) {
- byte_stream->vtable->shutdown(byte_stream, error);
+SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer,
+ uint32_t flags)
+ : ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) {
+ GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
+ grpc_slice_buffer_init(&backing_buffer_);
+ grpc_slice_buffer_swap(slice_buffer, &backing_buffer_);
}
-void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream) {
- byte_stream->vtable->destroy(byte_stream);
-}
+SliceBufferByteStream::~SliceBufferByteStream() {}
-// grpc_slice_buffer_stream
+void SliceBufferByteStream::Orphan() {
+ grpc_slice_buffer_destroy(&backing_buffer_);
+ GRPC_ERROR_UNREF(shutdown_error_);
+ // Note: We do not actually delete the object here, since
+ // SliceBufferByteStream is usually allocated as part of a larger
+ // object and has an OrphanablePtr of itself passed down through the
+ // filter stack.
+}
-static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream,
- size_t max_size_hint,
- grpc_closure* on_complete) {
- grpc_slice_buffer_stream* stream =
- reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
- GPR_ASSERT(stream->cursor < stream->backing_buffer.count);
+bool SliceBufferByteStream::Next(size_t max_size_hint,
+ grpc_closure* on_complete) {
+ GPR_ASSERT(cursor_ < backing_buffer_.count);
return true;
}
-static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice) {
- grpc_slice_buffer_stream* stream =
- reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
- if (stream->shutdown_error != GRPC_ERROR_NONE) {
- return GRPC_ERROR_REF(stream->shutdown_error);
+grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) {
+ if (shutdown_error_ != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(shutdown_error_);
}
- GPR_ASSERT(stream->cursor < stream->backing_buffer.count);
- *slice =
- grpc_slice_ref_internal(stream->backing_buffer.slices[stream->cursor]);
- stream->cursor++;
+ GPR_ASSERT(cursor_ < backing_buffer_.count);
+ *slice = grpc_slice_ref_internal(backing_buffer_.slices[cursor_]);
+ ++cursor_;
return GRPC_ERROR_NONE;
}
-static void slice_buffer_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error) {
- grpc_slice_buffer_stream* stream =
- reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
- GRPC_ERROR_UNREF(stream->shutdown_error);
- stream->shutdown_error = error;
+void SliceBufferByteStream::Shutdown(grpc_error* error) {
+ GRPC_ERROR_UNREF(shutdown_error_);
+ shutdown_error_ = error;
}
-static void slice_buffer_stream_destroy(grpc_byte_stream* byte_stream) {
- grpc_slice_buffer_stream* stream =
- reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
- grpc_slice_buffer_destroy(&stream->backing_buffer);
- GRPC_ERROR_UNREF(stream->shutdown_error);
+//
+// ByteStreamCache
+//
+
+ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)
+ : underlying_stream_(std::move(underlying_stream)),
+ length_(underlying_stream_->length()),
+ flags_(underlying_stream_->flags()) {
+ grpc_slice_buffer_init(&cache_buffer_);
}
-static const grpc_byte_stream_vtable slice_buffer_stream_vtable = {
- slice_buffer_stream_next, slice_buffer_stream_pull,
- slice_buffer_stream_shutdown, slice_buffer_stream_destroy};
+ByteStreamCache::~ByteStreamCache() { Destroy(); }
-void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream,
- grpc_slice_buffer* slice_buffer,
- uint32_t flags) {
- GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
- stream->base.length = static_cast<uint32_t>(slice_buffer->length);
- stream->base.flags = flags;
- stream->base.vtable = &slice_buffer_stream_vtable;
- grpc_slice_buffer_init(&stream->backing_buffer);
- grpc_slice_buffer_swap(slice_buffer, &stream->backing_buffer);
- stream->cursor = 0;
- stream->shutdown_error = GRPC_ERROR_NONE;
+void ByteStreamCache::Destroy() {
+ underlying_stream_.reset();
+ if (cache_buffer_.length > 0) {
+ grpc_slice_buffer_destroy_internal(&cache_buffer_);
+ }
}
-// grpc_caching_byte_stream
+//
+// ByteStreamCache::CachingByteStream
+//
-void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache,
- grpc_byte_stream* underlying_stream) {
- cache->underlying_stream = underlying_stream;
- grpc_slice_buffer_init(&cache->cache_buffer);
-}
+ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache)
+ : ByteStream(cache->length_, cache->flags_), cache_(cache) {}
-void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache) {
- grpc_byte_stream_destroy(cache->underlying_stream);
- grpc_slice_buffer_destroy_internal(&cache->cache_buffer);
+ByteStreamCache::CachingByteStream::~CachingByteStream() {}
+
+void ByteStreamCache::CachingByteStream::Orphan() {
+ GRPC_ERROR_UNREF(shutdown_error_);
+ // Note: We do not actually delete the object here, since
+ // CachingByteStream is usually allocated as part of a larger
+ // object and has an OrphanablePtr of itself passed down through the
+ // filter stack.
}
-static bool caching_byte_stream_next(grpc_byte_stream* byte_stream,
- size_t max_size_hint,
- grpc_closure* on_complete) {
- grpc_caching_byte_stream* stream =
- reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
- if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
- if (stream->cursor < stream->cache->cache_buffer.count) return true;
- return grpc_byte_stream_next(stream->cache->underlying_stream, max_size_hint,
- on_complete);
+bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint,
+ grpc_closure* on_complete) {
+ if (shutdown_error_ != GRPC_ERROR_NONE) return true;
+ if (cursor_ < cache_->cache_buffer_.count) return true;
+ GPR_ASSERT(cache_->underlying_stream_ != nullptr);
+ return cache_->underlying_stream_->Next(max_size_hint, on_complete);
}
-static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice) {
- grpc_caching_byte_stream* stream =
- reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
- if (stream->shutdown_error != GRPC_ERROR_NONE) {
- return GRPC_ERROR_REF(stream->shutdown_error);
+grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
+ if (shutdown_error_ != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(shutdown_error_);
}
- if (stream->cursor < stream->cache->cache_buffer.count) {
- *slice = grpc_slice_ref_internal(
- stream->cache->cache_buffer.slices[stream->cursor]);
- ++stream->cursor;
+ if (cursor_ < cache_->cache_buffer_.count) {
+ *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]);
+ ++cursor_;
+ offset_ += GRPC_SLICE_LENGTH(*slice);
return GRPC_ERROR_NONE;
}
- grpc_error* error =
- grpc_byte_stream_pull(stream->cache->underlying_stream, slice);
+ GPR_ASSERT(cache_->underlying_stream_ != nullptr);
+ grpc_error* error = cache_->underlying_stream_->Pull(slice);
if (error == GRPC_ERROR_NONE) {
- ++stream->cursor;
- grpc_slice_buffer_add(&stream->cache->cache_buffer,
+ grpc_slice_buffer_add(&cache_->cache_buffer_,
grpc_slice_ref_internal(*slice));
+ ++cursor_;
+ offset_ += GRPC_SLICE_LENGTH(*slice);
+ // Orphan the underlying stream if it's been drained.
+ if (offset_ == cache_->underlying_stream_->length()) {
+ cache_->underlying_stream_.reset();
+ }
}
return error;
}
-static void caching_byte_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error) {
- grpc_caching_byte_stream* stream =
- reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
- GRPC_ERROR_UNREF(stream->shutdown_error);
- stream->shutdown_error = GRPC_ERROR_REF(error);
- grpc_byte_stream_shutdown(stream->cache->underlying_stream, error);
-}
-
-static void caching_byte_stream_destroy(grpc_byte_stream* byte_stream) {
- grpc_caching_byte_stream* stream =
- reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
- GRPC_ERROR_UNREF(stream->shutdown_error);
+void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) {
+ GRPC_ERROR_UNREF(shutdown_error_);
+ shutdown_error_ = GRPC_ERROR_REF(error);
+ if (cache_->underlying_stream_ != nullptr) {
+ cache_->underlying_stream_->Shutdown(error);
+ }
}
-static const grpc_byte_stream_vtable caching_byte_stream_vtable = {
- caching_byte_stream_next, caching_byte_stream_pull,
- caching_byte_stream_shutdown, caching_byte_stream_destroy};
-
-void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream,
- grpc_byte_stream_cache* cache) {
- memset(stream, 0, sizeof(*stream));
- stream->base.length = cache->underlying_stream->length;
- stream->base.flags = cache->underlying_stream->flags;
- stream->base.vtable = &caching_byte_stream_vtable;
- stream->cache = cache;
- stream->shutdown_error = GRPC_ERROR_NONE;
+void ByteStreamCache::CachingByteStream::Reset() {
+ cursor_ = 0;
+ offset_ = 0;
}
-void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream) {
- stream->cursor = 0;
-}
+} // namespace grpc_core
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index 4d3c3c131b..eff832515d 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -22,7 +22,9 @@
#include <grpc/support/port_platform.h>
#include <grpc/slice_buffer.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/iomgr/closure.h"
/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
* compression for the message */
@@ -30,71 +32,82 @@
/** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
-typedef struct grpc_byte_stream grpc_byte_stream;
-
-typedef struct {
- bool (*next)(grpc_byte_stream* byte_stream, size_t max_size_hint,
- grpc_closure* on_complete);
- grpc_error* (*pull)(grpc_byte_stream* byte_stream, grpc_slice* slice);
- void (*shutdown)(grpc_byte_stream* byte_stream, grpc_error* error);
- void (*destroy)(grpc_byte_stream* byte_stream);
-} grpc_byte_stream_vtable;
-
-struct grpc_byte_stream {
- uint32_t length;
- uint32_t flags;
- const grpc_byte_stream_vtable* vtable;
+namespace grpc_core {
+
+class ByteStream : public Orphanable {
+ public:
+ virtual ~ByteStream() {}
+
+ // Returns true if the bytes are available immediately (in which case
+ // on_complete will not be called), or false if the bytes will be available
+ // asynchronously (in which case on_complete will be called when they
+ // are available).
+ //
+ // max_size_hint can be set as a hint as to the maximum number
+ // of bytes that would be acceptable to read.
+ virtual bool Next(size_t max_size_hint,
+ grpc_closure* on_complete) GRPC_ABSTRACT;
+
+ // Returns the next slice in the byte stream when it is available, as
+ // indicated by Next().
+ //
+ // Once a slice is returned into *slice, it is owned by the caller.
+ virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT;
+
+ // Shuts down the byte stream.
+ //
+ // If there is a pending call to on_complete from Next(), it will be
+ // invoked with the error passed to Shutdown().
+ //
+ // The next call to Pull() (if any) will return the error passed to
+ // Shutdown().
+ virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT;
+
+ uint32_t length() const { return length_; }
+ uint32_t flags() const { return flags_; }
+
+ void set_flags(uint32_t flags) { flags_ = flags; }
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ ByteStream(uint32_t length, uint32_t flags)
+ : length_(length), flags_(flags) {}
+
+ private:
+ const uint32_t length_;
+ uint32_t flags_;
};
-// Returns true if the bytes are available immediately (in which case
-// on_complete will not be called), false if the bytes will be available
-// asynchronously.
//
-// max_size_hint can be set as a hint as to the maximum number
-// of bytes that would be acceptable to read.
-bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint,
- grpc_closure* on_complete);
-
-// Returns the next slice in the byte stream when it is ready (indicated by
-// either grpc_byte_stream_next returning true or on_complete passed to
-// grpc_byte_stream_next is called).
+// SliceBufferByteStream
//
-// Once a slice is returned into *slice, it is owned by the caller.
-grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice);
-
-// Shuts down the byte stream.
+// A ByteStream that wraps a slice buffer.
//
-// If there is a pending call to on_complete from grpc_byte_stream_next(),
-// it will be invoked with the error passed to grpc_byte_stream_shutdown().
-//
-// The next call to grpc_byte_stream_pull() (if any) will return the error
-// passed to grpc_byte_stream_shutdown().
-void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error);
-void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream);
+class SliceBufferByteStream : public ByteStream {
+ public:
+ // Removes all slices in slice_buffer, leaving it empty.
+ SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags);
+
+ ~SliceBufferByteStream();
+
+ void Orphan() override;
+
+ bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
+ grpc_error* Pull(grpc_slice* slice) override;
+ void Shutdown(grpc_error* error) override;
+
+ private:
+ grpc_slice_buffer backing_buffer_;
+ size_t cursor_ = 0;
+ grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
+};
-// grpc_slice_buffer_stream
//
-// A grpc_byte_stream that wraps a slice buffer. The stream takes
-// ownership of the slices in the buffer, and on destruction will
-// reset the contents of the buffer.
-
-typedef struct grpc_slice_buffer_stream {
- grpc_byte_stream base;
- grpc_slice_buffer backing_buffer;
- size_t cursor;
- grpc_error* shutdown_error;
-} grpc_slice_buffer_stream;
-
-void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream,
- grpc_slice_buffer* slice_buffer,
- uint32_t flags);
-
-// grpc_caching_byte_stream
+// CachingByteStream
//
-// A grpc_byte_stream that that wraps an underlying byte stream but caches
+// A ByteStream that that wraps an underlying byte stream but caches
// the resulting slices in a slice buffer. If an initial attempt fails
// without fully draining the underlying stream, a new caching stream
// can be created from the same underlying cache, in which case it will
@@ -102,32 +115,50 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream,
// underlying stream.
//
// NOTE: No synchronization is done, so it is not safe to have multiple
-// grpc_caching_byte_streams simultaneously drawing from the same underlying
-// grpc_byte_stream_cache at the same time.
+// CachingByteStreams simultaneously drawing from the same underlying
+// ByteStreamCache at the same time.
+//
+
+class ByteStreamCache {
+ public:
+ class CachingByteStream : public ByteStream {
+ public:
+ explicit CachingByteStream(ByteStreamCache* cache);
+
+ ~CachingByteStream();
+
+ void Orphan() override;
-typedef struct {
- grpc_byte_stream* underlying_stream;
- grpc_slice_buffer cache_buffer;
-} grpc_byte_stream_cache;
+ bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
+ grpc_error* Pull(grpc_slice* slice) override;
+ void Shutdown(grpc_error* error) override;
-// Takes ownership of underlying_stream.
-void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache,
- grpc_byte_stream* underlying_stream);
+ // Resets the byte stream to the start of the underlying stream.
+ void Reset();
-// Must not be called while still in use by a grpc_caching_byte_stream.
-void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache);
+ private:
+ ByteStreamCache* cache_;
+ size_t cursor_ = 0;
+ size_t offset_ = 0;
+ grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
+ };
-typedef struct {
- grpc_byte_stream base;
- grpc_byte_stream_cache* cache;
- size_t cursor;
- grpc_error* shutdown_error;
-} grpc_caching_byte_stream;
+ explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream);
-void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream,
- grpc_byte_stream_cache* cache);
+ ~ByteStreamCache();
+
+ // Must not be destroyed while still in use by a CachingByteStream.
+ void Destroy();
+
+ grpc_slice_buffer* cache_buffer() { return &cache_buffer_; }
+
+ private:
+ OrphanablePtr<ByteStream> underlying_stream_;
+ uint32_t length_;
+ uint32_t flags_;
+ grpc_slice_buffer cache_buffer_;
+};
-// Resets the byte stream to the start of the underlying stream.
-void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream);
+} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc
index 0122e773ca..db6b6c0444 100644
--- a/src/core/lib/transport/connectivity_state.cc
+++ b/src/core/lib/transport/connectivity_state.cc
@@ -78,7 +78,7 @@ grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
gpr_atm_no_barrier_load(&tracker->current_state_atm));
if (grpc_connectivity_state_trace.enabled()) {
- gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name,
+ gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
grpc_connectivity_state_name(cur));
}
return cur;
@@ -89,7 +89,7 @@ grpc_connectivity_state grpc_connectivity_state_get(
grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
gpr_atm_no_barrier_load(&tracker->current_state_atm));
if (grpc_connectivity_state_trace.enabled()) {
- gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name,
+ gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
grpc_connectivity_state_name(cur));
}
if (error != nullptr) {
@@ -110,10 +110,10 @@ bool grpc_connectivity_state_notify_on_state_change(
gpr_atm_no_barrier_load(&tracker->current_state_atm));
if (grpc_connectivity_state_trace.enabled()) {
if (current == nullptr) {
- gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
+ gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
tracker->name, notify);
} else {
- gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
+ gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
tracker->name, grpc_connectivity_state_name(*current),
grpc_connectivity_state_name(cur), notify);
}
@@ -161,7 +161,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state_watcher* w;
if (grpc_connectivity_state_trace.enabled()) {
const char* error_string = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker,
+ gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker,
tracker->name, grpc_connectivity_state_name(cur),
grpc_connectivity_state_name(state), reason, error, error_string);
}
@@ -187,8 +187,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
*w->current = state;
tracker->watchers = w->next;
if (grpc_connectivity_state_trace.enabled()) {
- gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name,
- w->notify);
+ gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify);
}
GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_REF(tracker->current_error));
gpr_free(w);
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 421db5aa39..ecb083cfc2 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -23,7 +23,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/closure.h"
typedef struct grpc_connectivity_state_watcher {
/** we keep watchers in a linked list */
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index 5e0ecbdb96..78df4bc3a3 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -24,8 +24,8 @@
#include <grpc/grpc.h>
#include <grpc/slice.h>
+#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
extern grpc_core::DebugOnlyTraceFlag grpc_trace_metadata;
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index 3876063b52..7068750b6f 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -26,6 +26,7 @@
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/static_metadata.h"
diff --git a/src/core/lib/transport/status_conversion.h b/src/core/lib/transport/status_conversion.h
index 9f14e9bee0..487f00c08b 100644
--- a/src/core/lib/transport/status_conversion.h
+++ b/src/core/lib/transport/status_conversion.h
@@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
+
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/http2_errors.h"
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index c90d16fc32..6b41e4b37e 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -209,7 +209,7 @@ void grpc_transport_stream_op_batch_finish_with_failure(
grpc_transport_stream_op_batch* batch, grpc_error* error,
grpc_call_combiner* call_combiner) {
if (batch->send_message) {
- grpc_byte_stream_destroy(batch->payload->send_message.send_message);
+ batch->payload->send_message.send_message.reset();
}
if (batch->recv_message) {
GRPC_CALL_COMBINER_START(
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 37e50344c4..10e9df0f7c 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -184,11 +184,10 @@ struct grpc_transport_stream_op_batch_payload {
struct {
// The transport (or a filter that decides to return a failure before
- // the op gets down to the transport) is responsible for calling
- // grpc_byte_stream_destroy() on this.
+ // the op gets down to the transport) takes ownership.
// The batch's on_complete will not be called until after the byte
- // stream is destroyed.
- grpc_byte_stream* send_message;
+ // stream is orphaned.
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message;
} send_message;
struct {
@@ -216,10 +215,8 @@ struct grpc_transport_stream_op_batch_payload {
struct {
// Will be set by the transport to point to the byte stream
// containing a received message.
- // The caller is responsible for calling grpc_byte_stream_destroy()
- // on this byte stream.
// Will be NULL if trailing metadata is received instead of a message.
- grpc_byte_stream** recv_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
/** Should be enqueued when one message is ready to be processed. */
grpc_closure* recv_message_ready;
} recv_message;
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 6898da17ed..99af7c1931 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -75,9 +75,16 @@ char* grpc_transport_stream_op_batch_string(
if (op->send_message) {
gpr_strvec_add(&b, gpr_strdup(" "));
- gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d",
- op->payload->send_message.send_message->flags,
- op->payload->send_message.send_message->length);
+ if (op->payload->send_message.send_message != nullptr) {
+ gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d",
+ op->payload->send_message.send_message->flags(),
+ op->payload->send_message.send_message->length());
+ } else {
+ // This can happen when we check a batch after the transport has
+ // processed and cleared the send_message op.
+ tmp =
+ gpr_strdup("SEND_MESSAGE(flag and length unknown, already orphaned)");
+ }
gpr_strvec_add(&b, tmp);
}