aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/subchannel.cc
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-10-24 12:29:04 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-10-24 12:29:04 -0700
commitf85fd026e36aa11221bbb8211e7632acd8b85a43 (patch)
tree3fb4c9ef91998ea7f2d6ce898b5c2aa42e494c83 /src/core/ext/filters/client_channel/subchannel.cc
parent55906e4d231a886060b1917062be2b1d2cbd1497 (diff)
Client-side health checking support.
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc393
1 files changed, 272 insertions, 121 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index a0a40eb2b3..e4c6efe862 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -30,6 +30,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/health/health_check_client.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
@@ -41,6 +42,7 @@
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
@@ -50,6 +52,7 @@
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/service_config.h"
#include "src/core/lib/transport/status_metadata.h"
#define INTERNAL_REF_BITS 16
@@ -66,6 +69,10 @@ struct state_watcher {
grpc_closure closure;
grpc_subchannel* subchannel;
grpc_connectivity_state connectivity_state;
+ grpc_connectivity_state last_connectivity_state;
+ grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client;
+ grpc_closure health_check_closure;
+ grpc_connectivity_state health_state;
};
} // namespace
@@ -78,6 +85,12 @@ typedef struct external_state_watcher {
struct external_state_watcher* prev;
} external_state_watcher;
+namespace grpc_core {
+
+class ConnectedSubchannelStateWatcher;
+
+} // namespace grpc_core
+
struct grpc_subchannel {
grpc_connector* connector;
@@ -109,19 +122,24 @@ struct grpc_subchannel {
being setup */
grpc_pollset_set* pollset_set;
+ grpc_core::UniquePtr<char> health_check_service_name;
+
/** mutex protecting remaining elements */
gpr_mu mu;
- /** active connection, or null; of type grpc_core::ConnectedSubchannel
- */
+ /** active connection, or null */
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
+ grpc_core::OrphanablePtr<grpc_core::ConnectedSubchannelStateWatcher>
+ connected_subchannel_watcher;
/** have we seen a disconnection? */
bool disconnected;
/** are we connecting */
bool connecting;
+
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
+ grpc_connectivity_state_tracker state_and_health_tracker;
external_state_watcher root_external_state_watcher;
@@ -153,6 +171,171 @@ struct grpc_subchannel_call {
grpc_millis deadline;
};
+static void maybe_start_connecting_locked(grpc_subchannel* c);
+
+static const char* subchannel_connectivity_state_change_string(
+ grpc_connectivity_state state) {
+ switch (state) {
+ case GRPC_CHANNEL_IDLE:
+ return "Subchannel state change to IDLE";
+ case GRPC_CHANNEL_CONNECTING:
+ return "Subchannel state change to CONNECTING";
+ case GRPC_CHANNEL_READY:
+ return "Subchannel state change to READY";
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ return "Subchannel state change to TRANSIENT_FAILURE";
+ case GRPC_CHANNEL_SHUTDOWN:
+ return "Subchannel state change to SHUTDOWN";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+static void set_subchannel_connectivity_state_locked(
+ grpc_subchannel* c, grpc_connectivity_state state, grpc_error* error,
+ const char* reason) {
+ if (c->channelz_subchannel != nullptr) {
+ c->channelz_subchannel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string(
+ subchannel_connectivity_state_change_string(state)));
+ }
+ grpc_connectivity_state_set(&c->state_tracker, state, error, reason);
+}
+
+namespace grpc_core {
+
+class ConnectedSubchannelStateWatcher
+ : public InternallyRefCounted<ConnectedSubchannelStateWatcher> {
+ public:
+ // Must be instantiated while holding c->mu.
+ explicit ConnectedSubchannelStateWatcher(grpc_subchannel* c)
+ : subchannel_(c) {
+ // Steal subchannel ref for connecting.
+ GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher");
+ GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting");
+ // Start watching for connectivity state changes.
+ // Callback uses initial ref to this.
+ GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this,
+ grpc_schedule_on_exec_ctx);
+ c->connected_subchannel->NotifyOnStateChange(c->pollset_set,
+ &pending_connectivity_state_,
+ &on_connectivity_changed_);
+ // Start health check if needed.
+ grpc_connectivity_state health_state = GRPC_CHANNEL_READY;
+ if (c->health_check_service_name != nullptr) {
+ health_check_client_ = grpc_core::MakeOrphanable<HealthCheckClient>(
+ c->health_check_service_name.get(), c->connected_subchannel,
+ c->pollset_set, c->channelz_subchannel);
+ GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this,
+ grpc_schedule_on_exec_ctx);
+ Ref().release(); // Ref for health callback tracked manually.
+ health_check_client_->NotifyOnHealthChange(&health_state_,
+ &on_health_changed_);
+ health_state = GRPC_CHANNEL_CONNECTING;
+ }
+ // Report initial state.
+ set_subchannel_connectivity_state_locked(
+ c, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "subchannel_connected");
+ grpc_connectivity_state_set(&c->state_and_health_tracker, health_state,
+ GRPC_ERROR_NONE, "subchannel_connected");
+ }
+
+ ~ConnectedSubchannelStateWatcher() {
+ GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher");
+ }
+
+ void Orphan() override { health_check_client_.reset(); }
+
+ private:
+ static void OnConnectivityChanged(void* arg, grpc_error* error) {
+ auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
+ grpc_subchannel* c = self->subchannel_;
+ {
+ MutexLock lock(&c->mu);
+ switch (self->pending_connectivity_state_) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN: {
+ if (!c->disconnected && c->connected_subchannel != nullptr) {
+ if (grpc_trace_stream_refcount.enabled()) {
+ gpr_log(GPR_INFO,
+ "Connected subchannel %p of subchannel %p has gone into "
+ "%s. Attempting to reconnect.",
+ c->connected_subchannel.get(), c,
+ grpc_connectivity_state_name(
+ self->pending_connectivity_state_));
+ }
+ c->connected_subchannel.reset();
+ c->connected_subchannel_watcher.reset();
+ self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ set_subchannel_connectivity_state_locked(
+ c, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error),
+ "reflect_child");
+ grpc_connectivity_state_set(&c->state_and_health_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "reflect_child");
+ c->backoff_begun = false;
+ c->backoff->Reset();
+ maybe_start_connecting_locked(c);
+ } else {
+ self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
+ }
+ self->health_check_client_.reset();
+ break;
+ }
+ default: {
+ // In principle, this should never happen. We should not get
+ // a callback for READY, because that was the state we started
+ // this watch from. And a connected subchannel should never go
+ // from READY to CONNECTING or IDLE.
+ self->last_connectivity_state_ = self->pending_connectivity_state_;
+ set_subchannel_connectivity_state_locked(
+ c, self->pending_connectivity_state_, GRPC_ERROR_REF(error),
+ "reflect_child");
+ if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) {
+ grpc_connectivity_state_set(&c->state_and_health_tracker,
+ self->pending_connectivity_state_,
+ GRPC_ERROR_REF(error), "reflect_child");
+ }
+ c->connected_subchannel->NotifyOnStateChange(
+ nullptr, &self->pending_connectivity_state_,
+ &self->on_connectivity_changed_);
+ self = nullptr; // So we don't unref below.
+ }
+ }
+ }
+ // Don't unref until we've released the lock, because this might
+ // cause the subchannel (which contains the lock) to be destroyed.
+ if (self != nullptr) self->Unref();
+ }
+
+ static void OnHealthChanged(void* arg, grpc_error* error) {
+ auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
+ if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) {
+ self->Unref();
+ return;
+ }
+ grpc_subchannel* c = self->subchannel_;
+ MutexLock lock(&c->mu);
+ if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) {
+ grpc_connectivity_state_set(&c->state_and_health_tracker,
+ self->health_state_, GRPC_ERROR_REF(error),
+ "health_changed");
+ }
+ self->health_check_client_->NotifyOnHealthChange(&self->health_state_,
+ &self->on_health_changed_);
+ }
+
+ grpc_subchannel* subchannel_;
+ grpc_closure on_connectivity_changed_;
+ grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY;
+ grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY;
+ grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client_;
+ grpc_closure on_health_changed_;
+ grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING;
+};
+
+} // namespace grpc_core
+
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
(grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
sizeof(grpc_subchannel_call)))
@@ -198,8 +381,10 @@ static void subchannel_destroy(void* arg, grpc_error* error) {
c->channelz_subchannel.reset();
}
gpr_free((void*)c->filters);
+ c->health_check_service_name.reset();
grpc_channel_args_destroy(c->args);
grpc_connectivity_state_destroy(&c->state_tracker);
+ grpc_connectivity_state_destroy(&c->state_and_health_tracker);
grpc_connector_unref(c->connector);
grpc_pollset_set_destroy(c->pollset_set);
grpc_subchannel_key_destroy(c->key);
@@ -262,6 +447,7 @@ static void disconnect(grpc_subchannel* c) {
grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
c->connected_subchannel.reset();
+ c->connected_subchannel_watcher.reset();
gpr_mu_unlock(&c->mu);
}
@@ -337,6 +523,31 @@ static void parse_args_for_backoff_values(
.set_max_backoff(max_backoff_ms);
}
+namespace grpc_core {
+namespace {
+
+struct HealthCheckParams {
+ UniquePtr<char> service_name;
+
+ static void Parse(const grpc_json* field, HealthCheckParams* params) {
+ if (strcmp(field->key, "healthCheckConfig") == 0) {
+ if (field->type != GRPC_JSON_OBJECT) return;
+ for (grpc_json* sub_field = field->child; sub_field != nullptr;
+ sub_field = sub_field->next) {
+ if (sub_field->key == nullptr) return;
+ if (strcmp(sub_field->key, "serviceName") == 0) {
+ if (params->service_name != nullptr) return; // Duplicate.
+ if (sub_field->type != GRPC_JSON_STRING) return;
+ params->service_name.reset(gpr_strdup(sub_field->value));
+ }
+ }
+ }
+ }
+};
+
+} // namespace
+} // namespace grpc_core
+
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_subchannel_args* args) {
grpc_subchannel_key* key = grpc_subchannel_key_create(args);
@@ -387,12 +598,28 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
+ grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE,
+ "subchannel");
grpc_core::BackOff::Options backoff_options;
parse_args_for_backoff_values(args->args, &backoff_options,
&c->min_connect_timeout_ms);
c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
+ // Check whether we should enable health checking.
+ const char* service_config_json = grpc_channel_arg_get_string(
+ grpc_channel_args_find(c->args, GRPC_ARG_SERVICE_CONFIG));
+ if (service_config_json != nullptr) {
+ grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
+ grpc_core::ServiceConfig::Create(service_config_json);
+ if (service_config != nullptr) {
+ grpc_core::HealthCheckParams params;
+ service_config->ParseGlobalParams(grpc_core::HealthCheckParams::Parse,
+ &params);
+ c->health_check_service_name = std::move(params.service_name);
+ }
+ }
+
const grpc_arg* arg =
grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
bool channelz_enabled =
@@ -428,35 +655,6 @@ intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel) {
}
}
-static const char* subchannel_connectivity_state_change_string(
- grpc_connectivity_state state) {
- switch (state) {
- case GRPC_CHANNEL_IDLE:
- return "Subchannel state change to IDLE";
- case GRPC_CHANNEL_CONNECTING:
- return "Subchannel state change to CONNECTING";
- case GRPC_CHANNEL_READY:
- return "Subchannel state change to READY";
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- return "Subchannel state change to TRANSIENT_FAILURE";
- case GRPC_CHANNEL_SHUTDOWN:
- return "Subchannel state change to SHUTDOWN";
- }
- GPR_UNREACHABLE_CODE(return "UNKNOWN");
-}
-
-static void set_subchannel_connectivity_state_locked(
- grpc_subchannel* c, grpc_connectivity_state state, grpc_error* error,
- const char* reason) {
- if (c->channelz_subchannel != nullptr) {
- c->channelz_subchannel->AddTraceEvent(
- grpc_core::channelz::ChannelTrace::Severity::Info,
- grpc_slice_from_static_string(
- subchannel_connectivity_state_change_string(state)));
- }
- grpc_connectivity_state_set(&c->state_tracker, state, error, reason);
-}
-
static void continue_connect_locked(grpc_subchannel* c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -467,15 +665,19 @@ static void continue_connect_locked(grpc_subchannel* c) {
args.channel_args = c->args;
set_subchannel_connectivity_state_locked(c, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "connecting");
+ grpc_connectivity_state_set(&c->state_and_health_tracker,
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "connecting");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected);
}
-grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
- grpc_error** error) {
- grpc_connectivity_state state;
+grpc_connectivity_state grpc_subchannel_check_connectivity(
+ grpc_subchannel* c, grpc_error** error, bool inhibit_health_checks) {
gpr_mu_lock(&c->mu);
- state = grpc_connectivity_state_get(&c->state_tracker, error);
+ grpc_connectivity_state_tracker* tracker =
+ inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker;
+ grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error);
gpr_mu_unlock(&c->mu);
return state;
}
@@ -533,7 +735,8 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
/* Already connected: don't restart */
return;
}
- if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+ if (!grpc_connectivity_state_has_watchers(&c->state_tracker) &&
+ !grpc_connectivity_state_has_watchers(&c->state_and_health_tracker)) {
/* Nobody is interested in connecting: so don't just yet */
return;
}
@@ -560,16 +763,18 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
void grpc_subchannel_notify_on_state_change(
grpc_subchannel* c, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* notify) {
+ grpc_connectivity_state* state, grpc_closure* notify,
+ bool inhibit_health_checks) {
+ grpc_connectivity_state_tracker* tracker =
+ inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker;
external_state_watcher* w;
-
if (state == nullptr) {
gpr_mu_lock(&c->mu);
for (w = c->root_external_state_watcher.next;
w != &c->root_external_state_watcher; w = w->next) {
if (w->notify == notify) {
- grpc_connectivity_state_notify_on_state_change(&c->state_tracker,
- nullptr, &w->closure);
+ grpc_connectivity_state_notify_on_state_change(tracker, nullptr,
+ &w->closure);
}
}
gpr_mu_unlock(&c->mu);
@@ -588,62 +793,12 @@ void grpc_subchannel_notify_on_state_change(
w->next = &c->root_external_state_watcher;
w->prev = w->next->prev;
w->next->prev = w->prev->next = w;
- grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
- &w->closure);
+ grpc_connectivity_state_notify_on_state_change(tracker, state, &w->closure);
maybe_start_connecting_locked(c);
gpr_mu_unlock(&c->mu);
}
}
-static void on_connected_subchannel_connectivity_changed(void* p,
- grpc_error* error) {
- state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(p);
- grpc_subchannel* c = connected_subchannel_watcher->subchannel;
- gpr_mu* mu = &c->mu;
-
- gpr_mu_lock(mu);
-
- switch (connected_subchannel_watcher->connectivity_state) {
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- case GRPC_CHANNEL_SHUTDOWN: {
- if (!c->disconnected && c->connected_subchannel != nullptr) {
- if (grpc_trace_stream_refcount.enabled()) {
- gpr_log(GPR_INFO,
- "Connected subchannel %p of subchannel %p has gone into %s. "
- "Attempting to reconnect.",
- c->connected_subchannel.get(), c,
- grpc_connectivity_state_name(
- connected_subchannel_watcher->connectivity_state));
- }
- c->connected_subchannel.reset();
- set_subchannel_connectivity_state_locked(
- c, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error),
- "reflect_child");
- c->backoff_begun = false;
- c->backoff->Reset();
- maybe_start_connecting_locked(c);
- } else {
- connected_subchannel_watcher->connectivity_state =
- GRPC_CHANNEL_SHUTDOWN;
- }
- break;
- }
- default: {
- set_subchannel_connectivity_state_locked(
- c, connected_subchannel_watcher->connectivity_state,
- GRPC_ERROR_REF(error), "reflect_child");
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- c->connected_subchannel->NotifyOnStateChange(
- nullptr, &connected_subchannel_watcher->connectivity_state,
- &connected_subchannel_watcher->closure);
- connected_subchannel_watcher = nullptr;
- }
- }
- gpr_mu_unlock(mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
- gpr_free(connected_subchannel_watcher);
-}
-
static bool publish_transport_locked(grpc_subchannel* c) {
/* construct channel stack */
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
@@ -670,17 +825,7 @@ static bool publish_transport_locked(grpc_subchannel* c) {
intptr_t socket_uuid = c->connecting_result.socket_uuid;
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
- /* initialize state watcher */
- state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(
- gpr_zalloc(sizeof(*connected_subchannel_watcher)));
- connected_subchannel_watcher->subchannel = c;
- connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY;
- GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure,
- on_connected_subchannel_connectivity_changed,
- connected_subchannel_watcher, grpc_schedule_on_exec_ctx);
-
if (c->disconnected) {
- gpr_free(connected_subchannel_watcher);
grpc_channel_stack_destroy(stk);
gpr_free(stk);
return false;
@@ -692,17 +837,10 @@ static bool publish_transport_locked(grpc_subchannel* c) {
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
c->connected_subchannel.get(), c);
- /* setup subchannel watching connected subchannel for changes; subchannel
- ref for connecting is donated to the state watcher */
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
- c->connected_subchannel->NotifyOnStateChange(
- c->pollset_set, &connected_subchannel_watcher->connectivity_state,
- &connected_subchannel_watcher->closure);
-
- /* signal completion */
- set_subchannel_connectivity_state_locked(c, GRPC_CHANNEL_READY,
- GRPC_ERROR_NONE, "connected");
+ // Instantiate state watcher. Will clean itself up.
+ c->connected_subchannel_watcher =
+ grpc_core::MakeOrphanable<grpc_core::ConnectedSubchannelStateWatcher>(c);
+
return true;
}
@@ -725,6 +863,12 @@ static void on_subchannel_connected(void* arg, grpc_error* error) {
"Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
+ grpc_connectivity_state_set(
+ &c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Connect Failed", &error, 1),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
+ "connect_failed");
const char* errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
@@ -956,15 +1100,8 @@ void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
grpc_subchannel_call** call) {
- size_t allocation_size =
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call));
- if (args.parent_data_size > 0) {
- allocation_size +=
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
- args.parent_data_size;
- } else {
- allocation_size += channel_stack_->call_stack_size;
- }
+ const size_t allocation_size =
+ GetInitialCallSizeEstimate(args.parent_data_size);
*call = static_cast<grpc_subchannel_call*>(
gpr_arena_alloc(args.arena, allocation_size));
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
@@ -994,4 +1131,18 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
return GRPC_ERROR_NONE;
}
+size_t ConnectedSubchannel::GetInitialCallSizeEstimate(
+ size_t parent_data_size) const {
+ size_t allocation_size =
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call));
+ if (parent_data_size > 0) {
+ allocation_size +=
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
+ parent_data_size;
+ } else {
+ allocation_size += channel_stack_->call_stack_size;
+ }
+ return allocation_size;
+}
+
} // namespace grpc_core