aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/census/grpc_context.cc38
-rw-r--r--src/core/ext/filters/client_channel/README.md18
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc2
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.cc2
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc5
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc146
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc10
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h9
-rw-r--r--src/core/ext/filters/client_channel/resolver.h13
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc12
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc12
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc18
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h3
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc7
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc9
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.cc4
-rw-r--r--src/core/ext/filters/http/client_authority_filter.cc14
-rw-r--r--src/core/ext/filters/http/http_filters_plugin.cc27
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.cc10
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.cc2
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.cc6
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc2
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc27
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.cc17
26 files changed, 213 insertions, 204 deletions
diff --git a/src/core/ext/filters/census/grpc_context.cc b/src/core/ext/filters/census/grpc_context.cc
new file mode 100644
index 0000000000..599a798dda
--- /dev/null
+++ b/src/core/ext/filters/census/grpc_context.cc
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/census.h>
+#include <grpc/grpc.h>
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/call.h"
+
+void grpc_census_call_set_context(grpc_call* call, census_context* context) {
+ GRPC_API_TRACE("grpc_census_call_set_context(call=%p, census_context=%p)", 2,
+ (call, context));
+ if (context != nullptr) {
+ grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context, nullptr);
+ }
+}
+
+census_context* grpc_census_call_get_context(grpc_call* call) {
+ GRPC_API_TRACE("grpc_census_call_get_context(call=%p)", 1, (call));
+ return static_cast<census_context*>(
+ grpc_call_context_get(call, GRPC_CONTEXT_TRACING));
+}
diff --git a/src/core/ext/filters/client_channel/README.md b/src/core/ext/filters/client_channel/README.md
index 7c209db12e..9676a4535b 100644
--- a/src/core/ext/filters/client_channel/README.md
+++ b/src/core/ext/filters/client_channel/README.md
@@ -46,20 +46,4 @@ construction arguments for concrete grpc_subchannel instances.
Naming for GRPC
===============
-Names in GRPC are represented by a URI (as defined in
-[RFC 3986](https://tools.ietf.org/html/rfc3986)).
-
-The following schemes are currently supported:
-
-dns:///host:port - dns schemes are currently supported so long as authority is
- empty (authority based dns resolution is expected in a future
- release)
-
-unix:path - the unix scheme is used to create and connect to unix domain
- sockets - the authority must be empty, and the path
- represents the absolute or relative path to the desired
- socket
-
-ipv4:host:port - a pre-resolved ipv4 dotted decimal address/port combination
-
-ipv6:[host]:port - a pre-resolved ipv6 address/port combination
+See [/doc/naming.md](gRPC name resolution).
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index d3a4c49821..d015ceb335 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -3190,7 +3190,7 @@ static void cc_start_transport_stream_op_batch(
// For all other batches, release the call combiner.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
- "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
+ "chand=%p calld=%p: saved batch, yielding call combiner", chand,
calld);
}
GRPC_CALL_COMBINER_STOP(calld->call_combiner,
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.cc b/src/core/ext/filters/client_channel/client_channel_plugin.cc
index 71da648660..e0784b7e5c 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.cc
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.cc
@@ -56,7 +56,7 @@ void grpc_client_channel_init(void) {
grpc_register_http_proxy_mapper();
grpc_subchannel_index_init();
grpc_channel_init_register_stage(
- GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, append_filter,
+ GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter,
(void*)&grpc_client_channel_filter);
grpc_http_connect_register_handshaker_factory();
}
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
index 4e8b8b71db..7ce8da8c00 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -320,7 +320,7 @@ static void http_connect_handshaker_do_handshake(
// Take a new ref to be held by the write callback.
gpr_ref(&handshaker->refcount);
grpc_endpoint_write(args->endpoint, &handshaker->write_buffer,
- &handshaker->request_done_closure);
+ &handshaker->request_done_closure, nullptr);
gpr_mu_unlock(&handshaker->mu);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index b1d97a7016..1ee1925a25 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -92,6 +92,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -1259,7 +1260,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) {
// delegate to the RoundRobin to fill the children subchannels.
rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
- mu_guard guard(&lb_channel_mu_);
+ MutexLock lock(&lb_channel_mu_);
if (lb_channel_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
@@ -1890,7 +1891,7 @@ void grpc_lb_policy_grpclb_init() {
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<grpc_core::GrpcLbFactory>()));
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_LOW,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_client_load_reporting_filter,
(void*)&grpc_client_load_reporting_filter);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 2b6a9ba8c5..602d6e92f9 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -27,6 +27,7 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -80,6 +81,11 @@ class PickFirst : public LoadBalancingPolicy {
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
+
+ // Processes the connectivity change to READY for an unselected subchannel.
+ void ProcessUnselectedReadyLocked();
+
+ void CheckConnectivityStateAndStartWatchingLocked();
};
class PickFirstSubchannelList
@@ -120,7 +126,6 @@ class PickFirst : public LoadBalancingPolicy {
void ShutdownLocked() override;
void StartPickingLocked();
- void DestroyUnselectedSubchannelsLocked();
void UpdateChildRefsLocked();
// All our subchannels.
@@ -244,13 +249,9 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
void PickFirst::StartPickingLocked() {
started_picking_ = true;
- if (subchannel_list_ != nullptr) {
- for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
- if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
- subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
- break;
- }
- }
+ if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels() > 0) {
+ subchannel_list_->subchannel(0)
+ ->CheckConnectivityStateAndStartWatchingLocked();
}
}
@@ -279,23 +280,14 @@ bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
"No pick result available but synchronous result required.");
return true;
}
+ pick->next = pending_picks_;
+ pending_picks_ = pick;
if (!started_picking_) {
StartPickingLocked();
}
- pick->next = pending_picks_;
- pending_picks_ = pick;
return false;
}
-void PickFirst::DestroyUnselectedSubchannelsLocked() {
- for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
- PickFirstSubchannelData* sd = subchannel_list_->subchannel(i);
- if (selected_ != sd) {
- sd->UnrefSubchannelLocked("selected_different_subchannel");
- }
- }
-}
-
grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
return grpc_connectivity_state_get(&state_tracker_, error);
}
@@ -308,7 +300,7 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void PickFirst::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
@@ -335,7 +327,7 @@ void PickFirst::UpdateChildRefsLocked() {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
@@ -386,7 +378,8 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
- subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
+ subchannel_list_->subchannel(0)
+ ->CheckConnectivityStateAndStartWatchingLocked();
}
} else {
// We do have a selected subchannel.
@@ -411,7 +404,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
selected_ = sd;
subchannel_list_ = std::move(subchannel_list);
- DestroyUnselectedSubchannelsLocked();
sd->StartConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
@@ -440,7 +432,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// subchannel in the new list.
if (started_picking_) {
latest_pending_subchannel_list_->subchannel(0)
- ->StartConnectivityWatchLocked();
+ ->CheckConnectivityStateAndStartWatchingLocked();
}
}
}
@@ -496,7 +488,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
- UnrefSubchannelLocked("pf_selected_shutdown");
StopConnectivityWatchLocked();
} else {
grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
@@ -519,41 +510,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// select in place of the current one.
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
- // Case 2. Promote p->latest_pending_subchannel_list_ to
- // p->subchannel_list_.
- if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Pick First %p promoting pending subchannel list %p to "
- "replace %p",
- p, p->latest_pending_subchannel_list_.get(),
- p->subchannel_list_.get());
- }
- p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
- }
- // Cases 1 and 2.
- grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
- GRPC_ERROR_NONE, "connecting_ready");
- p->selected_ = this;
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
- subchannel());
- }
- // Drop all other subchannels, since we are now connected.
- p->DestroyUnselectedSubchannelsLocked();
- // Update any calls that were waiting for a pick.
- PickState* pick;
- while ((pick = p->pending_picks_)) {
- p->pending_picks_ = pick->next;
- pick->connected_subchannel =
- p->selected_->connected_subchannel()->Ref();
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Servicing pending pick with selected subchannel %p",
- p->selected_->subchannel());
- }
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
- }
+ ProcessUnselectedReadyLocked();
// Renew notification.
RenewConnectivityWatchLocked();
break;
@@ -561,11 +518,9 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
StopConnectivityWatchLocked();
PickFirstSubchannelData* sd = this;
- do {
- size_t next_index =
- (sd->Index() + 1) % subchannel_list()->num_subchannels();
- sd = subchannel_list()->subchannel(next_index);
- } while (sd->subchannel() == nullptr);
+ size_t next_index =
+ (sd->Index() + 1) % subchannel_list()->num_subchannels();
+ sd = subchannel_list()->subchannel(next_index);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
@@ -574,7 +529,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "exhausted_subchannels");
}
- sd->StartConnectivityWatchLocked();
+ sd->CheckConnectivityStateAndStartWatchingLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
@@ -595,6 +550,65 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
GRPC_ERROR_UNREF(error);
}
+void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ // If we get here, there are two possible cases:
+ // 1. We do not currently have a selected subchannel, and the update is
+ // for a subchannel in p->subchannel_list_ that we're trying to
+ // connect to. The goal here is to find a subchannel that we can
+ // select.
+ // 2. We do currently have a selected subchannel, and the update is
+ // for a subchannel in p->latest_pending_subchannel_list_. The
+ // goal here is to find a subchannel from the update that we can
+ // select in place of the current one.
+ GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
+ subchannel_list() == p->latest_pending_subchannel_list_.get());
+ // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
+ if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
+ }
+ // Cases 1 and 2.
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "subchannel_ready");
+ p->selected_ = this;
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
+ }
+ // Update any calls that were waiting for a pick.
+ PickState* pick;
+ while ((pick = p->pending_picks_)) {
+ p->pending_picks_ = pick->next;
+ pick->connected_subchannel = p->selected_->connected_subchannel()->Ref();
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p",
+ p->selected_->subchannel());
+ }
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
+}
+
+void PickFirst::PickFirstSubchannelData::
+ CheckConnectivityStateAndStartWatchingLocked() {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (p->selected_ != this &&
+ CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
+ // We must process the READY subchannel before we start watching it.
+ // Otherwise, we won't know it's READY because we will be waiting for its
+ // connectivity state to change from READY.
+ ProcessUnselectedReadyLocked();
+ }
+ GRPC_ERROR_UNREF(error);
+ StartConnectivityWatchLocked();
+}
+
//
// factory
//
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index fea84331d8..4195c1e9d1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -36,6 +36,7 @@
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -138,7 +139,8 @@ class RoundRobin : public LoadBalancingPolicy {
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, combiner,
- client_channel_factory, args) {
+ client_channel_factory, args),
+ last_ready_index_(num_subchannels() - 1) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@@ -179,7 +181,7 @@ class RoundRobin : public LoadBalancingPolicy {
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
- size_t last_ready_index_ = -1; // Index into list of last pick.
+ size_t last_ready_index_; // Index into list of last pick.
};
// Helper class to ensure that any function that modifies the child refs
@@ -400,7 +402,7 @@ bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) {
void RoundRobin::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
@@ -427,7 +429,7 @@ void RoundRobin::UpdateChildRefsLocked() {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index d4e51c584a..d87de51082 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -102,11 +102,6 @@ class SubchannelData {
return pending_connectivity_state_unsafe_;
}
- // Unrefs the subchannel. May be used if an individual subchannel is
- // no longer needed even though the subchannel list as a whole is not
- // being unreffed.
- virtual void UnrefSubchannelLocked(const char* reason);
-
// Resets the connection backoff.
// TODO(roth): This method should go away when we move the backoff
// code out of the subchannel and into the LB policies.
@@ -154,6 +149,10 @@ class SubchannelData {
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
+ // Unrefs the subchannel. May be overridden by subclasses that need
+ // to perform extra cleanup when unreffing the subchannel.
+ virtual void UnrefSubchannelLocked(const char* reason);
+
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
// Returns true if the connectivity state should be reported.
diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h
index 48f2e89095..e9acbb7c41 100644
--- a/src/core/ext/filters/client_channel/resolver.h
+++ b/src/core/ext/filters/client_channel/resolver.h
@@ -81,18 +81,7 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
///
/// If this causes new data to become available, then the currently
/// pending call to \a NextLocked() will return the new result.
- ///
- /// Note: Currently, all resolvers are required to return a new result
- /// shortly after this method is called. For pull-based mechanisms, if
- /// the implementation decides to delay querying the name service, it
- /// should immediately return a new copy of the previously returned
- /// result (and it can then return the updated data later, when it
- /// actually does query the name service). For push-based mechanisms,
- /// the implementation should immediately return a new copy of the
- /// last-seen result.
- /// TODO(roth): Remove this requirement once we fix pick_first to not
- /// throw away unselected subchannels.
- virtual void RequestReresolutionLocked() GRPC_ABSTRACT;
+ virtual void RequestReresolutionLocked() {}
/// Resets the re-resolution backoff, if any.
/// This needs to be implemented only by pull-based implementations;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index f2bb5f3c71..dfa52867d8 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -373,13 +373,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
void AresDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
- if (have_next_resolution_timer_) {
- // TODO(dgq): remove the following two lines once Pick First stops
- // discarding subchannels after selecting.
- ++resolved_version_;
- MaybeFinishNextLocked();
- return;
- }
+ if (have_next_resolution_timer_) return;
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@@ -401,10 +395,6 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
- // TODO(dgq): remove the following two lines once Pick First stops
- // discarding subchannels after selecting.
- ++resolved_version_;
- MaybeFinishNextLocked();
return;
}
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 282caf215c..65ff1ec1a5 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -247,13 +247,7 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
void NativeDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
- if (have_next_resolution_timer_) {
- // TODO(dgq): remove the following two lines once Pick First stops
- // discarding subchannels after selecting.
- ++resolved_version_;
- MaybeFinishNextLocked();
- return;
- }
+ if (have_next_resolution_timer_) return;
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@@ -275,10 +269,6 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
- // TODO(dgq): remove the following two lines once Pick First stops
- // discarding subchannels after selecting.
- ++resolved_version_;
- MaybeFinishNextLocked();
return;
}
}
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index 99a33f2277..144ac24a56 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -73,11 +73,6 @@ class FakeResolver : public Resolver {
// Results to use for the pretended re-resolution in
// RequestReresolutionLocked().
grpc_channel_args* reresolution_results_ = nullptr;
- // TODO(juanlishen): This can go away once pick_first is changed to not throw
- // away its subchannels, since that will eliminate its dependence on
- // channel_saw_error_locked() causing an immediate resolver return.
- // A copy of the most-recently used resolution results.
- grpc_channel_args* last_used_results_ = nullptr;
// pending next completion, or NULL
grpc_closure* next_completion_ = nullptr;
// target result address for next completion
@@ -96,7 +91,6 @@ FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) {
FakeResolver::~FakeResolver() {
grpc_channel_args_destroy(next_results_);
grpc_channel_args_destroy(reresolution_results_);
- grpc_channel_args_destroy(last_used_results_);
grpc_channel_args_destroy(channel_args_);
}
@@ -109,17 +103,11 @@ void FakeResolver::NextLocked(grpc_channel_args** target_result,
}
void FakeResolver::RequestReresolutionLocked() {
- // A resolution must have been returned before an error is seen.
- GPR_ASSERT(last_used_results_ != nullptr);
- grpc_channel_args_destroy(next_results_);
if (reresolution_results_ != nullptr) {
+ grpc_channel_args_destroy(next_results_);
next_results_ = grpc_channel_args_copy(reresolution_results_);
- } else {
- // If reresolution_results is unavailable, re-resolve with the most-recently
- // used results to avoid a no-op re-resolution.
- next_results_ = grpc_channel_args_copy(last_used_results_);
+ MaybeFinishNextLocked();
}
- MaybeFinishNextLocked();
}
void FakeResolver::MaybeFinishNextLocked() {
@@ -161,8 +149,6 @@ void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
FakeResolver* resolver = closure_arg->generator->resolver_;
grpc_channel_args_destroy(resolver->next_results_);
resolver->next_results_ = closure_arg->response;
- grpc_channel_args_destroy(resolver->last_used_results_);
- resolver->last_used_results_ = grpc_channel_args_copy(closure_arg->response);
resolver->MaybeFinishNextLocked();
Delete(closure_arg);
}
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
index e5175f9b7b..708eaf1147 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
@@ -53,7 +53,8 @@ class FakeResolverResponseGenerator
// The new re-resolution response replaces any previous re-resolution
// response that may have been set by a previous call.
// If the re-resolution response is set to NULL, then the fake
- // resolver will return the last value set via \a SetResponse().
+ // resolver will not return anything when \a RequestReresolutionLocked()
+ // is called.
void SetReresolutionResponse(grpc_channel_args* response);
// Tells the resolver to return a transient failure (signalled by
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
index f74ac5aebe..801734764b 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
@@ -50,8 +50,6 @@ class SockaddrResolver : public Resolver {
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
- void RequestReresolutionLocked() override;
-
void ShutdownLocked() override;
private:
@@ -90,11 +88,6 @@ void SockaddrResolver::NextLocked(grpc_channel_args** target_result,
MaybeFinishNextLocked();
}
-void SockaddrResolver::RequestReresolutionLocked() {
- published_ = false;
- MaybeFinishNextLocked();
-}
-
void SockaddrResolver::ShutdownLocked() {
if (next_completion_ != nullptr) {
*target_result_ = nullptr;
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index e94186da00..57d0b3759f 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -419,6 +419,8 @@ static void continue_connect_locked(grpc_subchannel* c) {
c->next_attempt_deadline = c->backoff->NextAttemptTime();
args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
+ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "connecting");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected);
}
@@ -493,8 +495,6 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
if (!c->backoff_begun) {
c->backoff_begun = true;
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_NONE, "connecting");
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
@@ -509,11 +509,6 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
- // During backoff, we prefer the connectivity state of CONNECTING instead of
- // TRANSIENT_FAILURE in order to prevent triggering re-resolution
- // continuously in pick_first.
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_NONE, "backoff");
}
}
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index 3bd3059312..d23ad67ad5 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -379,10 +379,10 @@ static bool maybe_add_deadline_filter(grpc_channel_stack_builder* builder,
void grpc_deadline_filter_init(void) {
grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_VERY_HIGH,
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_deadline_filter, (void*)&grpc_client_deadline_filter);
grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_VERY_HIGH,
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_deadline_filter, (void*)&grpc_server_deadline_filter);
}
diff --git a/src/core/ext/filters/http/client_authority_filter.cc b/src/core/ext/filters/http/client_authority_filter.cc
index 3c0ae47e8d..1ca20ebb26 100644
--- a/src/core/ext/filters/http/client_authority_filter.cc
+++ b/src/core/ext/filters/http/client_authority_filter.cc
@@ -94,7 +94,7 @@ grpc_error* init_channel_elem(grpc_channel_element* elem,
if (default_authority_arg == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"GRPC_ARG_DEFAULT_AUTHORITY channel arg. not found. Note that direct "
- "channels must explicity specify a value for this argument.");
+ "channels must explicitly specify a value for this argument.");
}
const char* default_authority_str =
grpc_channel_arg_get_string(default_authority_arg);
@@ -146,12 +146,12 @@ static bool add_client_authority_filter(grpc_channel_stack_builder* builder,
}
void grpc_client_authority_filter_init(void) {
- grpc_channel_init_register_stage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_PRIORITY_HIGH,
- add_client_authority_filter, (void*)&grpc_client_authority_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_HIGH,
- add_client_authority_filter, (void*)&grpc_client_authority_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
+ add_client_authority_filter,
+ (void*)&grpc_client_authority_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
+ add_client_authority_filter,
+ (void*)&grpc_client_authority_filter);
}
void grpc_client_authority_filter_shutdown(void) {}
diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc
index 38757710f3..f03fa0141d 100644
--- a/src/core/ext/filters/http/http_filters_plugin.cc
+++ b/src/core/ext/filters/http/http_filters_plugin.cc
@@ -18,7 +18,6 @@
#include <grpc/support/port_platform.h>
-#include <limits.h>
#include <string.h>
#include "src/core/ext/filters/http/client/http_client_filter.h"
@@ -52,15 +51,15 @@ static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder,
bool enable = grpc_channel_arg_get_bool(
grpc_channel_args_find(channel_args, filtarg->control_channel_arg),
!grpc_channel_args_want_minimal_stack(channel_args));
- return enable ? grpc_channel_stack_builder_append_filter(
+ return enable ? grpc_channel_stack_builder_prepend_filter(
builder, filtarg->filter, nullptr, nullptr)
: true;
}
-static bool maybe_append_required_filter(grpc_channel_stack_builder* builder,
- void* arg) {
+static bool maybe_add_required_filter(grpc_channel_stack_builder* builder,
+ void* arg) {
return is_building_http_like_transport(builder)
- ? grpc_channel_stack_builder_append_filter(
+ ? grpc_channel_stack_builder_prepend_filter(
builder, static_cast<const grpc_channel_filter*>(arg),
nullptr, nullptr)
: true;
@@ -68,23 +67,23 @@ static bool maybe_append_required_filter(grpc_channel_stack_builder* builder,
void grpc_http_filters_init(void) {
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_HIGH,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &compress_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_HIGH,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &compress_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_HIGH,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &compress_filter);
grpc_channel_init_register_stage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_PRIORITY_HIGH,
- maybe_append_required_filter, (void*)&grpc_http_client_filter);
+ GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_required_filter, (void*)&grpc_http_client_filter);
grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_HIGH,
- maybe_append_required_filter, (void*)&grpc_http_client_filter);
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_required_filter, (void*)&grpc_http_client_filter);
grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_HIGH,
- maybe_append_required_filter, (void*)&grpc_http_server_filter);
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_required_filter, (void*)&grpc_http_server_filter);
}
void grpc_http_filters_shutdown(void) {}
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
index 0c4ffea27b..8ac34c629f 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
@@ -162,9 +162,10 @@ void ServerLoadReportingCallData::GetCensusSafeClientIpString(
} else if (addr->sa_family == GRPC_AF_INET6) {
grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
*client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
- for (size_t i = 0; i < 16; ++i) {
- snprintf(*client_ip_string + i * 2, 2 + 1, "%02x",
- addr6->sin6_addr.__in6_u.__u6_addr8[i]);
+ uint32_t* addr6_next_long = reinterpret_cast<uint32_t*>(&addr6->sin6_addr);
+ for (size_t i = 0; i < 4; ++i) {
+ snprintf(*client_ip_string + 8 * i, 8 + 1, "%08x",
+ grpc_ntohl(*addr6_next_long++));
}
*size = 32;
} else {
@@ -345,8 +346,7 @@ struct ServerLoadReportingFilterStaticRegistrar {
if (registered) return;
RegisterChannelFilter<ServerLoadReportingChannelData,
ServerLoadReportingCallData>(
- "server_load_reporting", GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_LOW, true,
+ "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
MaybeAddServerLoadReportingFilter);
// Access measures to ensure they are initialized. Otherwise, we can't
// create any valid view before the first RPC.
diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc
index 7db30d5b48..1fe8288bd0 100644
--- a/src/core/ext/filters/max_age/max_age_filter.cc
+++ b/src/core/ext/filters/max_age/max_age_filter.cc
@@ -536,7 +536,7 @@ static bool maybe_add_max_age_filter(grpc_channel_stack_builder* builder,
void grpc_max_age_filter_init(void) {
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_LOW,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_max_age_filter, nullptr);
}
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index 1bd9cf1426..c7fc3f2e62 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -311,13 +311,13 @@ static bool maybe_add_message_size_filter(grpc_channel_stack_builder* builder,
void grpc_message_size_filter_init(void) {
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_LOW,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter, nullptr);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_LOW,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter, nullptr);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_LOW,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter, nullptr);
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index dfed824cd5..5bdcb387c9 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
- grpc_fd_create(fd, "client", false), args, "fd-client");
+ grpc_fd_create(fd, "client", true), args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index a0228785ee..e4bd91d07b 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -44,7 +44,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
gpr_asprintf(&name, "fd:%d", fd);
grpc_endpoint* server_endpoint =
- grpc_tcp_create(grpc_fd_create(fd, name, false),
+ grpc_tcp_create(grpc_fd_create(fd, name, true),
grpc_server_get_channel_args(server), name);
gpr_free(name);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index bc6fa0d0eb..027a57d606 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -812,6 +812,12 @@ static void set_write_state(grpc_chttp2_transport* t,
write_state_name(t->write_state),
write_state_name(st), reason));
t->write_state = st;
+ /* If the state is being reset back to idle, it means a write was just
+ * finished. Make sure all the run_after_write closures are scheduled.
+ *
+ * This is also our chance to close the transport if the transport was marked
+ * to be closed after all writes finish (for example, if we received a go-away
+ * from peer while we had some pending writes) */
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
if (t->close_transport_on_writes_finished != nullptr) {
@@ -899,6 +905,22 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
grpc_chttp2_initiate_write_reason_string(reason));
t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
+ /* Note that the 'write_action_begin_locked' closure is being scheduled
+ * on the 'finally_scheduler' of t->combiner. This means that
+ * 'write_action_begin_locked' is called only *after* all the other
+ * closures (some of which are potentially initiating more writes on the
+ * transport) are executed on the t->combiner.
+ *
+ * The reason for scheduling on finally_scheduler is to make sure we batch
+ * as many writes as possible. 'write_action_begin_locked' is the function
+ * that gathers all the relevant bytes (which are at various places in the
+ * grpc_chttp2_transport structure) and append them to 'outbuf' field in
+ * grpc_chttp2_transport thereby batching what would have been potentially
+ * multiple write operations.
+ *
+ * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
+ * It does not call the endpoint to write the bytes. That is done by the
+ * 'write_action' (which is scheduled by 'write_action_begin_locked') */
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t,
@@ -1007,9 +1029,12 @@ static void write_action(void* gt, grpc_error* error) {
grpc_endpoint_write(
t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
- grpc_combiner_scheduler(t->combiner)));
+ grpc_combiner_scheduler(t->combiner)),
+ nullptr);
}
+/* Callback from the grpc_endpoint after bytes have been written by calling
+ * sendmsg */
static void write_action_end_locked(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc
index e89c363200..53932bcb7f 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.cc
+++ b/src/core/ext/transport/chttp2/transport/flow_control.cc
@@ -40,6 +40,7 @@ namespace chttp2 {
namespace {
static constexpr const int kTracePadding = 30;
+static constexpr const uint32_t kMaxWindowUpdateSize = (1u << 31) - 1;
static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
char* str;
@@ -55,7 +56,7 @@ static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
char* str;
- if (new_val > 0 && old_val != new_val) {
+ if (old_val != new_val) {
gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
} else {
gpr_asprintf(&str, "%" PRIu32 "", old_val);
@@ -98,10 +99,12 @@ void FlowControlTrace::Finish() {
if (sfc_ != nullptr) {
srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
sfc_->remote_window_delta() + remote_window);
- slw_str = fmt_int64_diff_str(local_window_delta_ + acked_local_window,
- local_window_delta_ + acked_local_window);
- saw_str = fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
- announced_window_delta_ + acked_local_window);
+ slw_str =
+ fmt_int64_diff_str(local_window_delta_ + acked_local_window,
+ sfc_->local_window_delta() + acked_local_window);
+ saw_str =
+ fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
+ sfc_->announced_window_delta() + acked_local_window);
} else {
srw_str = gpr_leftpad("", ' ', kTracePadding);
slw_str = gpr_leftpad("", ' ', kTracePadding);
@@ -191,7 +194,7 @@ uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
announced_window_ != target_announced_window) {
const uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
- target_announced_window - announced_window_, 0, UINT32_MAX);
+ target_announced_window - announced_window_, 0, kMaxWindowUpdateSize);
announced_window_ += announce;
return announce;
}
@@ -265,7 +268,7 @@ uint32_t StreamFlowControl::MaybeSendUpdate() {
FlowControlTrace trace("s updt sent", tfc_, this);
if (local_window_delta_ > announced_window_delta_) {
uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
- local_window_delta_ - announced_window_delta_, 0, UINT32_MAX);
+ local_window_delta_ - announced_window_delta_, 0, kMaxWindowUpdateSize);
UpdateAnnouncedWindowDelta(tfc_, announce);
return announce;
}