diff options
author | Mark D. Roth <roth@google.com> | 2018-04-09 13:05:35 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-04-09 13:05:35 -0700 |
commit | 03f01fd9aee0e5dc918f324b3ac084468c6b630f (patch) | |
tree | 660c1e59740702a74fa08338936967b6d35cf29a /src | |
parent | c307805fe7f5dc38e1219cc4cc8a0d4a64f5d648 (diff) | |
parent | efcd45bc246425c13bc702cbd182a7a11062f190 (diff) |
Merge pull request #14845 from markdroth/inlined_vector
Change InlinedVector to keep elements stored contiguously.
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 27 | ||||
-rw-r--r-- | src/core/lib/gprpp/inlined_vector.h | 64 |
2 files changed, 46 insertions, 45 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 51f9ae000a..a10bfea8b1 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -924,7 +924,9 @@ typedef struct client_channel_call_data { // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. - grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages; + grpc_core::ManualConstructor< + grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>> + send_messages; // send_trailing_metadata bool seen_send_trailing_metadata; grpc_linked_mdelem* send_trailing_metadata_storage; @@ -974,7 +976,7 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); new (cache) grpc_core::ByteStreamCache( std::move(batch->payload->send_message.send_message)); - calld->send_messages.push_back(cache); + calld->send_messages->push_back(cache); } // Save metadata batch for send_trailing_metadata ops. if (batch->send_trailing_metadata) { @@ -1008,7 +1010,7 @@ static void free_cached_send_op_data_after_commit( "]", chand, calld, i); } - calld->send_messages[i]->Destroy(); + (*calld->send_messages)[i]->Destroy(); } if (retry_state->completed_send_trailing_metadata) { grpc_metadata_batch_destroy(&calld->send_trailing_metadata); @@ -1032,7 +1034,7 @@ static void free_cached_send_op_data_for_completed_batch( "]", chand, calld, retry_state->completed_send_message_count - 1); } - calld->send_messages[retry_state->completed_send_message_count - 1] + (*calld->send_messages)[retry_state->completed_send_message_count - 1] ->Destroy(); } if (batch_data->batch.send_trailing_metadata) { @@ -1280,7 +1282,8 @@ static bool pending_batch_is_completed( return false; } if (pending->batch->send_message && - retry_state->completed_send_message_count < calld->send_messages.size()) { + retry_state->completed_send_message_count < + calld->send_messages->size()) { return false; } if (pending->batch->send_trailing_metadata && @@ -1315,7 +1318,7 @@ static bool pending_batch_is_unstarted( return true; } if (pending->batch->send_message && - retry_state->started_send_message_count < calld->send_messages.size()) { + retry_state->started_send_message_count < calld->send_messages->size()) { return true; } if (pending->batch->send_trailing_metadata && @@ -1817,7 +1820,7 @@ static void add_closures_for_replay_or_pending_send_ops( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); bool have_pending_send_message_ops = - retry_state->started_send_message_count < calld->send_messages.size(); + retry_state->started_send_message_count < calld->send_messages->size(); bool have_pending_send_trailing_metadata_op = calld->seen_send_trailing_metadata && !retry_state->started_send_trailing_metadata; @@ -2133,7 +2136,7 @@ static void add_retriable_send_message_op( chand, calld, retry_state->started_send_message_count); } grpc_core::ByteStreamCache* cache = - calld->send_messages[retry_state->started_send_message_count]; + (*calld->send_messages)[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; batch_data->send_message.Init(cache); batch_data->batch.send_message = true; @@ -2254,7 +2257,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( } // send_message. // Note that we can only have one send_message op in flight at a time. - if (retry_state->started_send_message_count < calld->send_messages.size() && + if (retry_state->started_send_message_count < calld->send_messages->size() && retry_state->started_send_message_count == retry_state->completed_send_message_count && !calld->pending_send_message) { @@ -2274,7 +2277,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // to start, since we can't send down any more send_message ops after // send_trailing_metadata. if (calld->seen_send_trailing_metadata && - retry_state->started_send_message_count == calld->send_messages.size() && + retry_state->started_send_message_count == calld->send_messages->size() && !retry_state->started_send_trailing_metadata && !calld->pending_send_trailing_metadata) { if (grpc_client_channel_trace.enabled()) { @@ -2325,7 +2328,7 @@ static void add_subchannel_batches_for_pending_batches( // send_message ops after send_trailing_metadata. if (batch->send_trailing_metadata && (retry_state->started_send_message_count + batch->send_message < - calld->send_messages.size() || + calld->send_messages->size() || retry_state->started_send_trailing_metadata)) { continue; } @@ -2976,6 +2979,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem, calld->deadline); } calld->enable_retries = chand->enable_retries; + calld->send_messages.Init(); return GRPC_ERROR_NONE; } @@ -3011,6 +3015,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, calld->pick.subchannel_call_context[i].value); } } + calld->send_messages.Destroy(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h index ca95aecddc..f36f6cb706 100644 --- a/src/core/lib/gprpp/inlined_vector.h +++ b/src/core/lib/gprpp/inlined_vector.h @@ -54,43 +54,43 @@ class InlinedVector { InlinedVector(const InlinedVector&) = delete; InlinedVector& operator=(const InlinedVector&) = delete; + T* data() { + return dynamic_ != nullptr ? dynamic_ : reinterpret_cast<T*>(inline_); + } + + const T* data() const { + return dynamic_ != nullptr ? dynamic_ : reinterpret_cast<const T*>(inline_); + } + T& operator[](size_t offset) { assert(offset < size_); - if (offset < N) { - return *reinterpret_cast<T*>(inline_ + offset); - } else { - return dynamic_[offset - N]; - } + return data()[offset]; } const T& operator[](size_t offset) const { assert(offset < size_); - if (offset < N) { - return *reinterpret_cast<const T*>(inline_ + offset); - } else { - return dynamic_[offset - N]; + return data()[offset]; + } + + void reserve(size_t capacity) { + if (capacity > capacity_) { + T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * capacity)); + for (size_t i = 0; i < size_; ++i) { + new (&new_dynamic[i]) T(std::move(data()[i])); + data()[i].~T(); + } + gpr_free(dynamic_); + dynamic_ = new_dynamic; + capacity_ = capacity; } } template <typename... Args> void emplace_back(Args&&... args) { - if (size_ < N) { - new (&inline_[size_]) T(std::forward<Args>(args)...); - } else { - if (size_ - N == dynamic_capacity_) { - size_t new_capacity = - dynamic_capacity_ == 0 ? 2 : dynamic_capacity_ * 2; - T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * new_capacity)); - for (size_t i = 0; i < dynamic_capacity_; ++i) { - new (&new_dynamic[i]) T(std::move(dynamic_[i])); - dynamic_[i].~T(); - } - gpr_free(dynamic_); - dynamic_ = new_dynamic; - dynamic_capacity_ = new_capacity; - } - new (&dynamic_[size_ - N]) T(std::forward<Args>(args)...); + if (size_ == capacity_) { + reserve(capacity_ * 2); } + new (&(data()[size_])) T(std::forward<Args>(args)...); ++size_; } @@ -99,6 +99,7 @@ class InlinedVector { void push_back(T&& value) { emplace_back(std::move(value)); } size_t size() const { return size_; } + size_t capacity() const { return capacity_; } void clear() { destroy_elements(); @@ -109,26 +110,21 @@ class InlinedVector { void init_data() { dynamic_ = nullptr; size_ = 0; - dynamic_capacity_ = 0; + capacity_ = N; } void destroy_elements() { - for (size_t i = 0; i < size_ && i < N; ++i) { - T& value = *reinterpret_cast<T*>(inline_ + i); + for (size_t i = 0; i < size_; ++i) { + T& value = data()[i]; value.~T(); } - if (size_ > N) { // Avoid subtracting two signed values. - for (size_t i = 0; i < size_ - N; ++i) { - dynamic_[i].~T(); - } - } gpr_free(dynamic_); } typename std::aligned_storage<sizeof(T)>::type inline_[N]; T* dynamic_; size_t size_; - size_t dynamic_capacity_; + size_t capacity_; }; } // namespace grpc_core |