diff options
author | 2018-05-11 12:20:11 -0700 | |
---|---|---|
committer | 2018-05-11 12:20:11 -0700 | |
commit | 4fad281ce8affe27fb7428f264d2c3b9dfc45f2f (patch) | |
tree | ca96c9efd69afec56aa2e5fe072a9f758247d0a3 /src/core/lib/transport | |
parent | ec445cc2bb270ed4acb1c710c3533fca14a50019 (diff) | |
parent | 61fdb46ac456027c79841949272ec540f66d2317 (diff) |
Merge remote-tracking branch 'upstream/master' into fork_exec_ctx_check
Diffstat (limited to 'src/core/lib/transport')
-rw-r--r-- | src/core/lib/transport/bdp_estimator.cc | 6 | ||||
-rw-r--r-- | src/core/lib/transport/bdp_estimator.h | 4 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.cc | 208 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.h | 187 | ||||
-rw-r--r-- | src/core/lib/transport/connectivity_state.cc | 13 | ||||
-rw-r--r-- | src/core/lib/transport/connectivity_state.h | 2 | ||||
-rw-r--r-- | src/core/lib/transport/metadata.h | 2 | ||||
-rw-r--r-- | src/core/lib/transport/metadata_batch.h | 1 | ||||
-rw-r--r-- | src/core/lib/transport/status_conversion.h | 1 | ||||
-rw-r--r-- | src/core/lib/transport/transport.cc | 2 | ||||
-rw-r--r-- | src/core/lib/transport/transport.h | 11 | ||||
-rw-r--r-- | src/core/lib/transport/transport_op_string.cc | 13 |
12 files changed, 231 insertions, 219 deletions
diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 8130535ddd..8e71f86989 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -47,7 +47,7 @@ grpc_millis BdpEstimator::CompletePing() { double bw = dt > 0 ? (static_cast<double>(accumulator_) / dt) : 0; int start_inter_ping_delay = inter_ping_delay_; if (grpc_bdp_estimator_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64 " dt=%lf bw=%lfMbs bw_est=%lfMbs", name_, accumulator_, estimate_, dt, bw / 125000.0, @@ -58,7 +58,7 @@ grpc_millis BdpEstimator::CompletePing() { estimate_ = GPR_MAX(accumulator_, estimate_ * 2); bw_est_ = bw; if (grpc_bdp_estimator_trace.enabled()) { - gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, name_, + gpr_log(GPR_INFO, "bdp[%s]: estimate increased to %" PRId64, name_, estimate_); } inter_ping_delay_ /= 2; // if the ping estimate changes, @@ -75,7 +75,7 @@ grpc_millis BdpEstimator::CompletePing() { if (start_inter_ping_delay != inter_ping_delay_) { stable_estimate_count_ = 0; if (grpc_bdp_estimator_trace.enabled()) { - gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", name_, + gpr_log(GPR_INFO, "bdp[%s]:update_inter_time to %dms", name_, inter_ping_delay_); } } diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index e703af121c..ab13ae4be4 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -50,7 +50,7 @@ class BdpEstimator { // transport (but not necessarily started) void SchedulePing() { if (grpc_bdp_estimator_trace.enabled()) { - gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, name_, + gpr_log(GPR_INFO, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, name_, accumulator_, estimate_); } GPR_ASSERT(ping_state_ == PingState::UNSCHEDULED); @@ -63,7 +63,7 @@ class BdpEstimator { // the ping is on the wire void StartPing() { if (grpc_bdp_estimator_trace.enabled()) { - gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, name_, + gpr_log(GPR_INFO, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, name_, accumulator_, estimate_); } GPR_ASSERT(ping_state_ == PingState::SCHEDULED); diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc index e1751f8010..cb15a71a91 100644 --- a/src/core/lib/transport/byte_stream.cc +++ b/src/core/lib/transport/byte_stream.cc @@ -25,160 +25,136 @@ #include <grpc/support/log.h> +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/slice/slice_internal.h" -bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure* on_complete) { - return byte_stream->vtable->next(byte_stream, max_size_hint, on_complete); -} +namespace grpc_core { -grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { - return byte_stream->vtable->pull(byte_stream, slice); -} +// +// SliceBufferByteStream +// -void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - byte_stream->vtable->shutdown(byte_stream, error); +SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer, + uint32_t flags) + : ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) { + GPR_ASSERT(slice_buffer->length <= UINT32_MAX); + grpc_slice_buffer_init(&backing_buffer_); + grpc_slice_buffer_swap(slice_buffer, &backing_buffer_); } -void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream) { - byte_stream->vtable->destroy(byte_stream); -} +SliceBufferByteStream::~SliceBufferByteStream() {} -// grpc_slice_buffer_stream +void SliceBufferByteStream::Orphan() { + grpc_slice_buffer_destroy(&backing_buffer_); + GRPC_ERROR_UNREF(shutdown_error_); + // Note: We do not actually delete the object here, since + // SliceBufferByteStream is usually allocated as part of a larger + // object and has an OrphanablePtr of itself passed down through the + // filter stack. +} -static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream, - size_t max_size_hint, - grpc_closure* on_complete) { - grpc_slice_buffer_stream* stream = - reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream); - GPR_ASSERT(stream->cursor < stream->backing_buffer.count); +bool SliceBufferByteStream::Next(size_t max_size_hint, + grpc_closure* on_complete) { + GPR_ASSERT(cursor_ < backing_buffer_.count); return true; } -static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { - grpc_slice_buffer_stream* stream = - reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream); - if (stream->shutdown_error != GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(stream->shutdown_error); +grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) { + if (shutdown_error_ != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(shutdown_error_); } - GPR_ASSERT(stream->cursor < stream->backing_buffer.count); - *slice = - grpc_slice_ref_internal(stream->backing_buffer.slices[stream->cursor]); - stream->cursor++; + GPR_ASSERT(cursor_ < backing_buffer_.count); + *slice = grpc_slice_ref_internal(backing_buffer_.slices[cursor_]); + ++cursor_; return GRPC_ERROR_NONE; } -static void slice_buffer_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - grpc_slice_buffer_stream* stream = - reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream); - GRPC_ERROR_UNREF(stream->shutdown_error); - stream->shutdown_error = error; +void SliceBufferByteStream::Shutdown(grpc_error* error) { + GRPC_ERROR_UNREF(shutdown_error_); + shutdown_error_ = error; } -static void slice_buffer_stream_destroy(grpc_byte_stream* byte_stream) { - grpc_slice_buffer_stream* stream = - reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream); - grpc_slice_buffer_destroy(&stream->backing_buffer); - GRPC_ERROR_UNREF(stream->shutdown_error); +// +// ByteStreamCache +// + +ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream) + : underlying_stream_(std::move(underlying_stream)), + length_(underlying_stream_->length()), + flags_(underlying_stream_->flags()) { + grpc_slice_buffer_init(&cache_buffer_); } -static const grpc_byte_stream_vtable slice_buffer_stream_vtable = { - slice_buffer_stream_next, slice_buffer_stream_pull, - slice_buffer_stream_shutdown, slice_buffer_stream_destroy}; +ByteStreamCache::~ByteStreamCache() { Destroy(); } -void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, - grpc_slice_buffer* slice_buffer, - uint32_t flags) { - GPR_ASSERT(slice_buffer->length <= UINT32_MAX); - stream->base.length = static_cast<uint32_t>(slice_buffer->length); - stream->base.flags = flags; - stream->base.vtable = &slice_buffer_stream_vtable; - grpc_slice_buffer_init(&stream->backing_buffer); - grpc_slice_buffer_swap(slice_buffer, &stream->backing_buffer); - stream->cursor = 0; - stream->shutdown_error = GRPC_ERROR_NONE; +void ByteStreamCache::Destroy() { + underlying_stream_.reset(); + if (cache_buffer_.length > 0) { + grpc_slice_buffer_destroy_internal(&cache_buffer_); + } } -// grpc_caching_byte_stream +// +// ByteStreamCache::CachingByteStream +// -void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache, - grpc_byte_stream* underlying_stream) { - cache->underlying_stream = underlying_stream; - grpc_slice_buffer_init(&cache->cache_buffer); -} +ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache) + : ByteStream(cache->length_, cache->flags_), cache_(cache) {} -void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache) { - grpc_byte_stream_destroy(cache->underlying_stream); - grpc_slice_buffer_destroy_internal(&cache->cache_buffer); +ByteStreamCache::CachingByteStream::~CachingByteStream() {} + +void ByteStreamCache::CachingByteStream::Orphan() { + GRPC_ERROR_UNREF(shutdown_error_); + // Note: We do not actually delete the object here, since + // CachingByteStream is usually allocated as part of a larger + // object and has an OrphanablePtr of itself passed down through the + // filter stack. } -static bool caching_byte_stream_next(grpc_byte_stream* byte_stream, - size_t max_size_hint, - grpc_closure* on_complete) { - grpc_caching_byte_stream* stream = - reinterpret_cast<grpc_caching_byte_stream*>(byte_stream); - if (stream->shutdown_error != GRPC_ERROR_NONE) return true; - if (stream->cursor < stream->cache->cache_buffer.count) return true; - return grpc_byte_stream_next(stream->cache->underlying_stream, max_size_hint, - on_complete); +bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint, + grpc_closure* on_complete) { + if (shutdown_error_ != GRPC_ERROR_NONE) return true; + if (cursor_ < cache_->cache_buffer_.count) return true; + GPR_ASSERT(cache_->underlying_stream_ != nullptr); + return cache_->underlying_stream_->Next(max_size_hint, on_complete); } -static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { - grpc_caching_byte_stream* stream = - reinterpret_cast<grpc_caching_byte_stream*>(byte_stream); - if (stream->shutdown_error != GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(stream->shutdown_error); +grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { + if (shutdown_error_ != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(shutdown_error_); } - if (stream->cursor < stream->cache->cache_buffer.count) { - *slice = grpc_slice_ref_internal( - stream->cache->cache_buffer.slices[stream->cursor]); - ++stream->cursor; + if (cursor_ < cache_->cache_buffer_.count) { + *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]); + ++cursor_; + offset_ += GRPC_SLICE_LENGTH(*slice); return GRPC_ERROR_NONE; } - grpc_error* error = - grpc_byte_stream_pull(stream->cache->underlying_stream, slice); + GPR_ASSERT(cache_->underlying_stream_ != nullptr); + grpc_error* error = cache_->underlying_stream_->Pull(slice); if (error == GRPC_ERROR_NONE) { - ++stream->cursor; - grpc_slice_buffer_add(&stream->cache->cache_buffer, + grpc_slice_buffer_add(&cache_->cache_buffer_, grpc_slice_ref_internal(*slice)); + ++cursor_; + offset_ += GRPC_SLICE_LENGTH(*slice); + // Orphan the underlying stream if it's been drained. + if (offset_ == cache_->underlying_stream_->length()) { + cache_->underlying_stream_.reset(); + } } return error; } -static void caching_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - grpc_caching_byte_stream* stream = - reinterpret_cast<grpc_caching_byte_stream*>(byte_stream); - GRPC_ERROR_UNREF(stream->shutdown_error); - stream->shutdown_error = GRPC_ERROR_REF(error); - grpc_byte_stream_shutdown(stream->cache->underlying_stream, error); -} - -static void caching_byte_stream_destroy(grpc_byte_stream* byte_stream) { - grpc_caching_byte_stream* stream = - reinterpret_cast<grpc_caching_byte_stream*>(byte_stream); - GRPC_ERROR_UNREF(stream->shutdown_error); +void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) { + GRPC_ERROR_UNREF(shutdown_error_); + shutdown_error_ = GRPC_ERROR_REF(error); + if (cache_->underlying_stream_ != nullptr) { + cache_->underlying_stream_->Shutdown(error); + } } -static const grpc_byte_stream_vtable caching_byte_stream_vtable = { - caching_byte_stream_next, caching_byte_stream_pull, - caching_byte_stream_shutdown, caching_byte_stream_destroy}; - -void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream, - grpc_byte_stream_cache* cache) { - memset(stream, 0, sizeof(*stream)); - stream->base.length = cache->underlying_stream->length; - stream->base.flags = cache->underlying_stream->flags; - stream->base.vtable = &caching_byte_stream_vtable; - stream->cache = cache; - stream->shutdown_error = GRPC_ERROR_NONE; +void ByteStreamCache::CachingByteStream::Reset() { + cursor_ = 0; + offset_ = 0; } -void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream) { - stream->cursor = 0; -} +} // namespace grpc_core diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 4d3c3c131b..eff832515d 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -22,7 +22,9 @@ #include <grpc/support/port_platform.h> #include <grpc/slice_buffer.h> -#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/iomgr/closure.h" /** Internal bit flag for grpc_begin_message's \a flags signaling the use of * compression for the message */ @@ -30,71 +32,82 @@ /** Mask of all valid internal flags. */ #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) -typedef struct grpc_byte_stream grpc_byte_stream; - -typedef struct { - bool (*next)(grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure* on_complete); - grpc_error* (*pull)(grpc_byte_stream* byte_stream, grpc_slice* slice); - void (*shutdown)(grpc_byte_stream* byte_stream, grpc_error* error); - void (*destroy)(grpc_byte_stream* byte_stream); -} grpc_byte_stream_vtable; - -struct grpc_byte_stream { - uint32_t length; - uint32_t flags; - const grpc_byte_stream_vtable* vtable; +namespace grpc_core { + +class ByteStream : public Orphanable { + public: + virtual ~ByteStream() {} + + // Returns true if the bytes are available immediately (in which case + // on_complete will not be called), or false if the bytes will be available + // asynchronously (in which case on_complete will be called when they + // are available). + // + // max_size_hint can be set as a hint as to the maximum number + // of bytes that would be acceptable to read. + virtual bool Next(size_t max_size_hint, + grpc_closure* on_complete) GRPC_ABSTRACT; + + // Returns the next slice in the byte stream when it is available, as + // indicated by Next(). + // + // Once a slice is returned into *slice, it is owned by the caller. + virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT; + + // Shuts down the byte stream. + // + // If there is a pending call to on_complete from Next(), it will be + // invoked with the error passed to Shutdown(). + // + // The next call to Pull() (if any) will return the error passed to + // Shutdown(). + virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT; + + uint32_t length() const { return length_; } + uint32_t flags() const { return flags_; } + + void set_flags(uint32_t flags) { flags_ = flags; } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + ByteStream(uint32_t length, uint32_t flags) + : length_(length), flags_(flags) {} + + private: + const uint32_t length_; + uint32_t flags_; }; -// Returns true if the bytes are available immediately (in which case -// on_complete will not be called), false if the bytes will be available -// asynchronously. // -// max_size_hint can be set as a hint as to the maximum number -// of bytes that would be acceptable to read. -bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure* on_complete); - -// Returns the next slice in the byte stream when it is ready (indicated by -// either grpc_byte_stream_next returning true or on_complete passed to -// grpc_byte_stream_next is called). +// SliceBufferByteStream // -// Once a slice is returned into *slice, it is owned by the caller. -grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice); - -// Shuts down the byte stream. +// A ByteStream that wraps a slice buffer. // -// If there is a pending call to on_complete from grpc_byte_stream_next(), -// it will be invoked with the error passed to grpc_byte_stream_shutdown(). -// -// The next call to grpc_byte_stream_pull() (if any) will return the error -// passed to grpc_byte_stream_shutdown(). -void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error); -void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream); +class SliceBufferByteStream : public ByteStream { + public: + // Removes all slices in slice_buffer, leaving it empty. + SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags); + + ~SliceBufferByteStream(); + + void Orphan() override; + + bool Next(size_t max_size_hint, grpc_closure* on_complete) override; + grpc_error* Pull(grpc_slice* slice) override; + void Shutdown(grpc_error* error) override; + + private: + grpc_slice_buffer backing_buffer_; + size_t cursor_ = 0; + grpc_error* shutdown_error_ = GRPC_ERROR_NONE; +}; -// grpc_slice_buffer_stream // -// A grpc_byte_stream that wraps a slice buffer. The stream takes -// ownership of the slices in the buffer, and on destruction will -// reset the contents of the buffer. - -typedef struct grpc_slice_buffer_stream { - grpc_byte_stream base; - grpc_slice_buffer backing_buffer; - size_t cursor; - grpc_error* shutdown_error; -} grpc_slice_buffer_stream; - -void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, - grpc_slice_buffer* slice_buffer, - uint32_t flags); - -// grpc_caching_byte_stream +// CachingByteStream // -// A grpc_byte_stream that that wraps an underlying byte stream but caches +// A ByteStream that that wraps an underlying byte stream but caches // the resulting slices in a slice buffer. If an initial attempt fails // without fully draining the underlying stream, a new caching stream // can be created from the same underlying cache, in which case it will @@ -102,32 +115,50 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, // underlying stream. // // NOTE: No synchronization is done, so it is not safe to have multiple -// grpc_caching_byte_streams simultaneously drawing from the same underlying -// grpc_byte_stream_cache at the same time. +// CachingByteStreams simultaneously drawing from the same underlying +// ByteStreamCache at the same time. +// + +class ByteStreamCache { + public: + class CachingByteStream : public ByteStream { + public: + explicit CachingByteStream(ByteStreamCache* cache); + + ~CachingByteStream(); + + void Orphan() override; -typedef struct { - grpc_byte_stream* underlying_stream; - grpc_slice_buffer cache_buffer; -} grpc_byte_stream_cache; + bool Next(size_t max_size_hint, grpc_closure* on_complete) override; + grpc_error* Pull(grpc_slice* slice) override; + void Shutdown(grpc_error* error) override; -// Takes ownership of underlying_stream. -void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache, - grpc_byte_stream* underlying_stream); + // Resets the byte stream to the start of the underlying stream. + void Reset(); -// Must not be called while still in use by a grpc_caching_byte_stream. -void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache); + private: + ByteStreamCache* cache_; + size_t cursor_ = 0; + size_t offset_ = 0; + grpc_error* shutdown_error_ = GRPC_ERROR_NONE; + }; -typedef struct { - grpc_byte_stream base; - grpc_byte_stream_cache* cache; - size_t cursor; - grpc_error* shutdown_error; -} grpc_caching_byte_stream; + explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream); -void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream, - grpc_byte_stream_cache* cache); + ~ByteStreamCache(); + + // Must not be destroyed while still in use by a CachingByteStream. + void Destroy(); + + grpc_slice_buffer* cache_buffer() { return &cache_buffer_; } + + private: + OrphanablePtr<ByteStream> underlying_stream_; + uint32_t length_; + uint32_t flags_; + grpc_slice_buffer cache_buffer_; +}; -// Resets the byte stream to the start of the underlying stream. -void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream); +} // namespace grpc_core #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index 0122e773ca..db6b6c0444 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -78,7 +78,7 @@ grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state cur = static_cast<grpc_connectivity_state>( gpr_atm_no_barrier_load(&tracker->current_state_atm)); if (grpc_connectivity_state_trace.enabled()) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, + gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name, grpc_connectivity_state_name(cur)); } return cur; @@ -89,7 +89,7 @@ grpc_connectivity_state grpc_connectivity_state_get( grpc_connectivity_state cur = static_cast<grpc_connectivity_state>( gpr_atm_no_barrier_load(&tracker->current_state_atm)); if (grpc_connectivity_state_trace.enabled()) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, + gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name, grpc_connectivity_state_name(cur)); } if (error != nullptr) { @@ -110,10 +110,10 @@ bool grpc_connectivity_state_notify_on_state_change( gpr_atm_no_barrier_load(&tracker->current_state_atm)); if (grpc_connectivity_state_trace.enabled()) { if (current == nullptr) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, + gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, tracker->name, notify); } else { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, + gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, tracker->name, grpc_connectivity_state_name(*current), grpc_connectivity_state_name(cur), notify); } @@ -161,7 +161,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, grpc_connectivity_state_watcher* w; if (grpc_connectivity_state_trace.enabled()) { const char* error_string = grpc_error_string(error); - gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker, + gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker, tracker->name, grpc_connectivity_state_name(cur), grpc_connectivity_state_name(state), reason, error, error_string); } @@ -187,8 +187,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, *w->current = state; tracker->watchers = w->next; if (grpc_connectivity_state_trace.enabled()) { - gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name, - w->notify); + gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify); } GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_REF(tracker->current_error)); gpr_free(w); diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 421db5aa39..ecb083cfc2 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -23,7 +23,7 @@ #include <grpc/grpc.h> #include "src/core/lib/debug/trace.h" -#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/closure.h" typedef struct grpc_connectivity_state_watcher { /** we keep watchers in a linked list */ diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index 5e0ecbdb96..78df4bc3a3 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -24,8 +24,8 @@ #include <grpc/grpc.h> #include <grpc/slice.h> +#include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/iomgr/exec_ctx.h" extern grpc_core::DebugOnlyTraceFlag grpc_trace_metadata; diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 3876063b52..7068750b6f 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -26,6 +26,7 @@ #include <grpc/grpc.h> #include <grpc/slice.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" diff --git a/src/core/lib/transport/status_conversion.h b/src/core/lib/transport/status_conversion.h index 9f14e9bee0..487f00c08b 100644 --- a/src/core/lib/transport/status_conversion.h +++ b/src/core/lib/transport/status_conversion.h @@ -22,6 +22,7 @@ #include <grpc/support/port_platform.h> #include <grpc/grpc.h> + #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/http2_errors.h" diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index c90d16fc32..6b41e4b37e 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -209,7 +209,7 @@ void grpc_transport_stream_op_batch_finish_with_failure( grpc_transport_stream_op_batch* batch, grpc_error* error, grpc_call_combiner* call_combiner) { if (batch->send_message) { - grpc_byte_stream_destroy(batch->payload->send_message.send_message); + batch->payload->send_message.send_message.reset(); } if (batch->recv_message) { GRPC_CALL_COMBINER_START( diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 37e50344c4..10e9df0f7c 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -184,11 +184,10 @@ struct grpc_transport_stream_op_batch_payload { struct { // The transport (or a filter that decides to return a failure before - // the op gets down to the transport) is responsible for calling - // grpc_byte_stream_destroy() on this. + // the op gets down to the transport) takes ownership. // The batch's on_complete will not be called until after the byte - // stream is destroyed. - grpc_byte_stream* send_message; + // stream is orphaned. + grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message; } send_message; struct { @@ -216,10 +215,8 @@ struct grpc_transport_stream_op_batch_payload { struct { // Will be set by the transport to point to the byte stream // containing a received message. - // The caller is responsible for calling grpc_byte_stream_destroy() - // on this byte stream. // Will be NULL if trailing metadata is received instead of a message. - grpc_byte_stream** recv_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; /** Should be enqueued when one message is ready to be processed. */ grpc_closure* recv_message_ready; } recv_message; diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index 6898da17ed..99af7c1931 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -75,9 +75,16 @@ char* grpc_transport_stream_op_batch_string( if (op->send_message) { gpr_strvec_add(&b, gpr_strdup(" ")); - gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d", - op->payload->send_message.send_message->flags, - op->payload->send_message.send_message->length); + if (op->payload->send_message.send_message != nullptr) { + gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d", + op->payload->send_message.send_message->flags(), + op->payload->send_message.send_message->length()); + } else { + // This can happen when we check a batch after the transport has + // processed and cleared the send_message op. + tmp = + gpr_strdup("SEND_MESSAGE(flag and length unknown, already orphaned)"); + } gpr_strvec_add(&b, tmp); } |