aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters
diff options
context:
space:
mode:
authorGravatar Soheil Hassas Yeganeh <soheil@google.com>2018-11-02 15:20:02 -0400
committerGravatar Soheil Hassas Yeganeh <soheil@google.com>2018-11-05 10:12:39 -0500
commit48e4a81b05f2ad6541d72e819cd4f638055f13d5 (patch)
tree8d5b64b7113721afb2eb4a0363cbd3fd7e47ff41 /src/core/ext/filters
parent5e6c4491bf60aa91bd3e4fed3c8203601a4c795e (diff)
Remeve memset(0) from arena allocated memory.
Callers are updated to properly initialize the memory. This behavior can be overridden using GRPC_ARENA_INIT_STRATEGY environment variable.
Diffstat (limited to 'src/core/ext/filters')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc270
-rw-r--r--src/core/ext/filters/client_channel/health/health_check_client.cc5
-rw-r--r--src/core/ext/filters/client_channel/health/health_check_client.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h12
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc26
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc15
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.cc42
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.h22
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc48
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.cc44
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.cc51
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.cc95
12 files changed, 352 insertions, 280 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 58db7a05b4..d42961fc60 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -938,12 +938,26 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
// (census filter is on top of this one)
// - add census stats for retries
+namespace {
+struct call_data;
+
// State used for starting a retryable batch on a subchannel call.
// This provides its own grpc_transport_stream_op_batch and other data
// structures needed to populate the ops in the batch.
// We allocate one struct on the arena for each attempt at starting a
// batch on a given subchannel call.
-typedef struct {
+struct subchannel_batch_data {
+ subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount,
+ bool set_on_complete);
+ // All dtor code must be added in `destroy`. This is because we may
+ // call closures in `subchannel_batch_data` after they are unrefed by
+ // `batch_data_unref`, and msan would complain about accessing this class
+ // after calling dtor. As a result we cannot call the `dtor` in
+ // `batch_data_unref`.
+ // TODO(soheil): We should try to call the dtor in `batch_data_unref`.
+ ~subchannel_batch_data() { destroy(); }
+ void destroy();
+
gpr_refcount refs;
grpc_call_element* elem;
grpc_subchannel_call* subchannel_call; // Holds a ref.
@@ -952,11 +966,23 @@ typedef struct {
grpc_transport_stream_op_batch batch;
// For intercepting on_complete.
grpc_closure on_complete;
-} subchannel_batch_data;
+};
// Retry state associated with a subchannel call.
// Stored in the parent_data of the subchannel call object.
-typedef struct {
+struct subchannel_call_retry_state {
+ explicit subchannel_call_retry_state(grpc_call_context_element* context)
+ : batch_payload(context),
+ started_send_initial_metadata(false),
+ completed_send_initial_metadata(false),
+ started_send_trailing_metadata(false),
+ completed_send_trailing_metadata(false),
+ started_recv_initial_metadata(false),
+ completed_recv_initial_metadata(false),
+ started_recv_trailing_metadata(false),
+ completed_recv_trailing_metadata(false),
+ retry_dispatched(false) {}
+
// subchannel_batch_data.batch.payload points to this.
grpc_transport_stream_op_batch_payload batch_payload;
// For send_initial_metadata.
@@ -975,7 +1001,7 @@ typedef struct {
// For intercepting recv_initial_metadata.
grpc_metadata_batch recv_initial_metadata;
grpc_closure recv_initial_metadata_ready;
- bool trailing_metadata_available;
+ bool trailing_metadata_available = false;
// For intercepting recv_message.
grpc_closure recv_message_ready;
grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
@@ -985,10 +1011,10 @@ typedef struct {
grpc_closure recv_trailing_metadata_ready;
// These fields indicate which ops have been started and completed on
// this subchannel call.
- size_t started_send_message_count;
- size_t completed_send_message_count;
- size_t started_recv_message_count;
- size_t completed_recv_message_count;
+ size_t started_send_message_count = 0;
+ size_t completed_send_message_count = 0;
+ size_t started_recv_message_count = 0;
+ size_t completed_recv_message_count = 0;
bool started_send_initial_metadata : 1;
bool completed_send_initial_metadata : 1;
bool started_send_trailing_metadata : 1;
@@ -999,12 +1025,12 @@ typedef struct {
bool completed_recv_trailing_metadata : 1;
// State for callback processing.
bool retry_dispatched : 1;
- subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
- grpc_error* recv_initial_metadata_error;
- subchannel_batch_data* recv_message_ready_deferred_batch;
- grpc_error* recv_message_error;
- subchannel_batch_data* recv_trailing_metadata_internal_batch;
-} subchannel_call_retry_state;
+ subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr;
+ grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
+ subchannel_batch_data* recv_message_ready_deferred_batch = nullptr;
+ grpc_error* recv_message_error = GRPC_ERROR_NONE;
+ subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr;
+};
// Pending batches stored in call data.
typedef struct {
@@ -1019,7 +1045,44 @@ typedef struct {
Handles queueing of stream ops until a call object is ready, waiting
for initial metadata before trying to create a call object,
and handling cancellation gracefully. */
-typedef struct client_channel_call_data {
+struct call_data {
+ call_data(grpc_call_element* elem, const channel_data& chand,
+ const grpc_call_element_args& args)
+ : deadline_state(elem, args.call_stack, args.call_combiner,
+ GPR_LIKELY(chand.deadline_checking_enabled)
+ ? args.deadline
+ : GRPC_MILLIS_INF_FUTURE),
+ path(grpc_slice_ref_internal(args.path)),
+ call_start_time(args.start_time),
+ deadline(args.deadline),
+ arena(args.arena),
+ owning_call(args.call_stack),
+ call_combiner(args.call_combiner),
+ pending_send_initial_metadata(false),
+ pending_send_message(false),
+ pending_send_trailing_metadata(false),
+ enable_retries(chand.enable_retries),
+ retry_committed(false),
+ last_attempt_got_server_pushback(false) {}
+
+ ~call_data() {
+ if (GPR_LIKELY(subchannel_call != nullptr)) {
+ GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call,
+ "client_channel_destroy_call");
+ }
+ grpc_slice_unref_internal(path);
+ GRPC_ERROR_UNREF(cancel_error);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
+ GPR_ASSERT(pending_batches[i].batch == nullptr);
+ }
+ for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
+ if (pick.subchannel_call_context[i].value != nullptr) {
+ pick.subchannel_call_context[i].destroy(
+ pick.subchannel_call_context[i].value);
+ }
+ }
+ }
+
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
// TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
@@ -1038,24 +1101,24 @@ typedef struct client_channel_call_data {
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
- grpc_subchannel_call* subchannel_call;
+ grpc_subchannel_call* subchannel_call = nullptr;
// Set when we get a cancel_stream op.
- grpc_error* cancel_error;
+ grpc_error* cancel_error = GRPC_ERROR_NONE;
grpc_core::LoadBalancingPolicy::PickState pick;
grpc_closure pick_closure;
grpc_closure pick_cancel_closure;
- grpc_polling_entity* pollent;
- bool pollent_added_to_interested_parties;
+ grpc_polling_entity* pollent = nullptr;
+ bool pollent_added_to_interested_parties = false;
// Batches are added to this list when received from above.
// They are removed when we are done handling the batch (i.e., when
// either we have invoked all of the batch's callbacks or we have
// passed the batch down to the subchannel call and are not
// intercepting any of its callbacks).
- pending_batch pending_batches[MAX_PENDING_BATCHES];
+ pending_batch pending_batches[MAX_PENDING_BATCHES] = {};
bool pending_send_initial_metadata : 1;
bool pending_send_message : 1;
bool pending_send_trailing_metadata : 1;
@@ -1064,8 +1127,8 @@ typedef struct client_channel_call_data {
bool enable_retries : 1;
bool retry_committed : 1;
bool last_attempt_got_server_pushback : 1;
- int num_attempts_completed;
- size_t bytes_buffered_for_retry;
+ int num_attempts_completed = 0;
+ size_t bytes_buffered_for_retry = 0;
grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
grpc_timer retry_timer;
@@ -1076,12 +1139,12 @@ typedef struct client_channel_call_data {
// until all of these batches have completed.
// Note that we actually only need to track replay batches, but it's
// easier to track all batches with send ops.
- int num_pending_retriable_subchannel_send_batches;
+ int num_pending_retriable_subchannel_send_batches = 0;
// Cached data for retrying send ops.
// send_initial_metadata
- bool seen_send_initial_metadata;
- grpc_linked_mdelem* send_initial_metadata_storage;
+ bool seen_send_initial_metadata = false;
+ grpc_linked_mdelem* send_initial_metadata_storage = nullptr;
grpc_metadata_batch send_initial_metadata;
uint32_t send_initial_metadata_flags;
gpr_atm* peer_string;
@@ -1092,14 +1155,13 @@ typedef struct client_channel_call_data {
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
- grpc_core::ManualConstructor<
- grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
- send_messages;
+ grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
// send_trailing_metadata
- bool seen_send_trailing_metadata;
- grpc_linked_mdelem* send_trailing_metadata_storage;
+ bool seen_send_trailing_metadata = false;
+ grpc_linked_mdelem* send_trailing_metadata_storage = nullptr;
grpc_metadata_batch send_trailing_metadata;
-} call_data;
+};
+} // namespace
// Forward declarations.
static void retry_commit(grpc_call_element* elem,
@@ -1143,7 +1205,7 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
new (cache) grpc_core::ByteStreamCache(
std::move(batch->payload->send_message.send_message));
- calld->send_messages->push_back(cache);
+ calld->send_messages.push_back(cache);
}
// Save metadata batch for send_trailing_metadata ops.
if (batch->send_trailing_metadata) {
@@ -1180,7 +1242,7 @@ static void free_cached_send_message(channel_data* chand, call_data* calld,
"chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
chand, calld, idx);
}
- (*calld->send_messages)[idx]->Destroy();
+ calld->send_messages[idx]->Destroy();
}
// Frees cached send_trailing_metadata.
@@ -1650,55 +1712,66 @@ static bool maybe_retry(grpc_call_element* elem,
// subchannel_batch_data
//
-// Creates a subchannel_batch_data object on the call's arena with the
-// specified refcount. If set_on_complete is true, the batch's
-// on_complete callback will be set to point to on_complete();
-// otherwise, the batch's on_complete callback will be null.
-static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
- int refcount,
- bool set_on_complete) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
+namespace {
+subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
+ call_data* calld, int refcount,
+ bool set_on_complete)
+ : elem(elem),
+ subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call,
+ "batch_data_create")) {
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
- gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
- batch_data->elem = elem;
- batch_data->subchannel_call =
- GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
- batch_data->batch.payload = &retry_state->batch_payload;
- gpr_ref_init(&batch_data->refs, refcount);
+ batch.payload = &retry_state->batch_payload;
+ gpr_ref_init(&refs, refcount);
if (set_on_complete) {
- GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+ GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this,
grpc_schedule_on_exec_ctx);
- batch_data->batch.on_complete = &batch_data->on_complete;
+ batch.on_complete = &on_complete;
}
GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
+}
+
+void subchannel_batch_data::destroy() {
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(subchannel_call));
+ if (batch.send_initial_metadata) {
+ grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
+ }
+ if (batch.send_trailing_metadata) {
+ grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
+ }
+ if (batch.recv_initial_metadata) {
+ grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
+ }
+ if (batch.recv_trailing_metadata) {
+ grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
+ }
+ GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref");
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
+}
+} // namespace
+
+// Creates a subchannel_batch_data object on the call's arena with the
+// specified refcount. If set_on_complete is true, the batch's
+// on_complete callback will be set to point to on_complete();
+// otherwise, the batch's on_complete callback will be null.
+static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
+ int refcount,
+ bool set_on_complete) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ subchannel_batch_data* batch_data =
+ new (gpr_arena_alloc(calld->arena, sizeof(*batch_data)))
+ subchannel_batch_data(elem, calld, refcount, set_on_complete);
return batch_data;
}
static void batch_data_unref(subchannel_batch_data* batch_data) {
if (gpr_unref(&batch_data->refs)) {
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- grpc_connected_subchannel_call_get_parent_data(
- batch_data->subchannel_call));
- if (batch_data->batch.send_initial_metadata) {
- grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
- }
- if (batch_data->batch.send_trailing_metadata) {
- grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
- }
- if (batch_data->batch.recv_initial_metadata) {
- grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
- }
- if (batch_data->batch.recv_trailing_metadata) {
- grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
- }
- GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
- GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
+ batch_data->destroy();
}
}
@@ -1996,7 +2069,7 @@ static bool pending_batch_is_unstarted(
return true;
}
if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages->size()) {
+ retry_state->started_send_message_count < calld->send_messages.size()) {
return true;
}
if (pending->batch->send_trailing_metadata &&
@@ -2152,7 +2225,7 @@ static void add_closures_for_replay_or_pending_send_ops(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
- retry_state->started_send_message_count < calld->send_messages->size();
+ retry_state->started_send_message_count < calld->send_messages.size();
bool have_pending_send_trailing_metadata_op =
calld->seen_send_trailing_metadata &&
!retry_state->started_send_trailing_metadata;
@@ -2344,7 +2417,7 @@ static void add_retriable_send_message_op(
chand, calld, retry_state->started_send_message_count);
}
grpc_core::ByteStreamCache* cache =
- (*calld->send_messages)[retry_state->started_send_message_count];
+ calld->send_messages[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
retry_state->send_message.Init(cache);
batch_data->batch.send_message = true;
@@ -2476,7 +2549,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
}
// send_message.
// Note that we can only have one send_message op in flight at a time.
- if (retry_state->started_send_message_count < calld->send_messages->size() &&
+ if (retry_state->started_send_message_count < calld->send_messages.size() &&
retry_state->started_send_message_count ==
retry_state->completed_send_message_count &&
!calld->pending_send_message) {
@@ -2497,7 +2570,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// to start, since we can't send down any more send_message ops after
// send_trailing_metadata.
if (calld->seen_send_trailing_metadata &&
- retry_state->started_send_message_count == calld->send_messages->size() &&
+ retry_state->started_send_message_count == calld->send_messages.size() &&
!retry_state->started_send_trailing_metadata &&
!calld->pending_send_trailing_metadata) {
if (grpc_client_channel_trace.enabled()) {
@@ -2549,7 +2622,7 @@ static void add_subchannel_batches_for_pending_batches(
// send_message ops after send_trailing_metadata.
if (batch->send_trailing_metadata &&
(retry_state->started_send_message_count + batch->send_message <
- calld->send_messages->size() ||
+ calld->send_messages.size() ||
retry_state->started_send_trailing_metadata)) {
continue;
}
@@ -2716,11 +2789,9 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
} else {
if (parent_data_size > 0) {
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- grpc_connected_subchannel_call_get_parent_data(
- calld->subchannel_call));
- retry_state->batch_payload.context = calld->pick.subchannel_call_context;
+ new (grpc_connected_subchannel_call_get_parent_data(
+ calld->subchannel_call))
+ subchannel_call_retry_state(calld->pick.subchannel_call_context);
}
pending_batches_resume(elem);
}
@@ -3260,21 +3331,8 @@ static void cc_start_transport_stream_op_batch(
/* Constructor for call_data */
static grpc_error* cc_init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- // Initialize data members.
- calld->path = grpc_slice_ref_internal(args->path);
- calld->call_start_time = args->start_time;
- calld->deadline = args->deadline;
- calld->arena = args->arena;
- calld->owning_call = args->call_stack;
- calld->call_combiner = args->call_combiner;
- if (GPR_LIKELY(chand->deadline_checking_enabled)) {
- grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
- calld->deadline);
- }
- calld->enable_retries = chand->enable_retries;
- calld->send_messages.Init();
+ new (elem->call_data) call_data(elem, *chand, *args);
return GRPC_ERROR_NONE;
}
@@ -3283,34 +3341,12 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- if (GPR_LIKELY(chand->deadline_checking_enabled)) {
- grpc_deadline_state_destroy(elem);
- }
- grpc_slice_unref_internal(calld->path);
- calld->retry_throttle_data.reset();
- calld->method_params.reset();
- GRPC_ERROR_UNREF(calld->cancel_error);
if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
then_schedule_closure);
then_schedule_closure = nullptr;
- GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
- "client_channel_destroy_call");
- }
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
- }
- if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) {
- calld->pick.connected_subchannel.reset();
- }
- for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
- if (calld->pick.subchannel_call_context[i].value != nullptr) {
- calld->pick.subchannel_call_context[i].destroy(
- calld->pick.subchannel_call_context[i].value);
- }
}
- calld->send_messages.Destroy();
+ calld->~call_data();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc
index 591637aa86..587919596f 100644
--- a/src/core/ext/filters/client_channel/health/health_check_client.cc
+++ b/src/core/ext/filters/client_channel/health/health_check_client.cc
@@ -286,10 +286,9 @@ HealthCheckClient::CallState::CallState(
health_check_client_(std::move(health_check_client)),
pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
arena_(gpr_arena_create(health_check_client_->connected_subchannel_
- ->GetInitialCallSizeEstimate(0))) {
- memset(&call_combiner_, 0, sizeof(call_combiner_));
+ ->GetInitialCallSizeEstimate(0))),
+ payload_(context_) {
grpc_call_combiner_init(&call_combiner_);
- memset(context_, 0, sizeof(context_));
gpr_atm_rel_store(&seen_response_, static_cast<gpr_atm>(0));
}
diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h
index 7f77348f18..f6babef7d6 100644
--- a/src/core/ext/filters/client_channel/health/health_check_client.h
+++ b/src/core/ext/filters/client_channel/health/health_check_client.h
@@ -97,7 +97,7 @@ class HealthCheckClient
gpr_arena* arena_;
grpc_call_combiner call_combiner_;
- grpc_call_context_element context_[GRPC_CONTEXT_COUNT];
+ grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
// The streaming call to the backend. Always non-NULL.
grpc_subchannel_call* call_;
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 21f80b7b94..b0040457a6 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -63,29 +63,29 @@ class LoadBalancingPolicy
/// State used for an LB pick.
struct PickState {
/// Initial metadata associated with the picking call.
- grpc_metadata_batch* initial_metadata;
+ grpc_metadata_batch* initial_metadata = nullptr;
/// Bitmask used for selective cancelling. See
/// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in
/// grpc_types.h.
- uint32_t initial_metadata_flags;
+ uint32_t initial_metadata_flags = 0;
/// Storage for LB token in \a initial_metadata, or nullptr if not used.
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
/// If null, pick will fail if a result is not available synchronously.
- grpc_closure* on_complete;
+ grpc_closure* on_complete = nullptr;
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if
/// needed.
- grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
+ grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT] = {};
/// Upon success, \a *user_data will be set to whatever opaque information
/// may need to be propagated from the LB policy, or nullptr if not needed.
// TODO(roth): As part of revamping our metadata APIs, try to find a
// way to clean this up and C++-ify it.
- void** user_data;
+ void** user_data = nullptr;
/// Next pointer. For internal use by LB policy.
- PickState* next;
+ PickState* next = nullptr;
};
// Not copyable nor movable.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
index cc259bcdbf..399bb452f4 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
@@ -37,16 +37,27 @@ static void destroy_channel_elem(grpc_channel_element* elem) {}
namespace {
struct call_data {
+ call_data(const grpc_call_element_args& args) {
+ if (args.context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
+ // Get stats object from context and take a ref.
+ client_stats = static_cast<grpc_core::GrpcLbClientStats*>(
+ args.context[GRPC_GRPCLB_CLIENT_STATS].value)
+ ->Ref();
+ // Record call started.
+ client_stats->AddCallStarted();
+ }
+ }
+
// Stats object to update.
grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
// State for intercepting send_initial_metadata.
grpc_closure on_complete_for_send;
grpc_closure* original_on_complete_for_send;
- bool send_initial_metadata_succeeded;
+ bool send_initial_metadata_succeeded = false;
// State for intercepting recv_initial_metadata.
grpc_closure recv_initial_metadata_ready;
grpc_closure* original_recv_initial_metadata_ready;
- bool recv_initial_metadata_succeeded;
+ bool recv_initial_metadata_succeeded = false;
};
} // namespace
@@ -70,16 +81,8 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- // Get stats object from context and take a ref.
GPR_ASSERT(args->context != nullptr);
- if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
- calld->client_stats = static_cast<grpc_core::GrpcLbClientStats*>(
- args->context[GRPC_GRPCLB_CLIENT_STATS].value)
- ->Ref();
- // Record call started.
- calld->client_stats->AddCallStarted();
- }
+ new (elem->call_data) call_data(*args);
return GRPC_ERROR_NONE;
}
@@ -97,6 +100,7 @@ static void destroy_call_elem(grpc_call_element* elem,
// TODO(roth): Eliminate this once filter stack is converted to C++.
calld->client_stats.reset();
}
+ calld->~call_data();
}
static void start_transport_stream_op_batch(
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 340fa0c464..a56db0201b 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -162,12 +162,16 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
+ grpc_subchannel_call(grpc_core::ConnectedSubchannel* connection,
+ const grpc_core::ConnectedSubchannel::CallArgs& args)
+ : connection(connection), deadline(args.deadline) {}
+
grpc_core::ConnectedSubchannel* connection;
- grpc_closure* schedule_closure_after_destroy;
+ grpc_closure* schedule_closure_after_destroy = nullptr;
// state needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready;
grpc_closure* original_recv_trailing_metadata;
- grpc_metadata_batch* recv_trailing_metadata;
+ grpc_metadata_batch* recv_trailing_metadata = nullptr;
grpc_millis deadline;
};
@@ -905,6 +909,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) {
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy);
connection->Unref(DEBUG_LOCATION, "subchannel_call");
+ c->~grpc_subchannel_call();
}
void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call,
@@ -1102,14 +1107,12 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
grpc_subchannel_call** call) {
const size_t allocation_size =
GetInitialCallSizeEstimate(args.parent_data_size);
- *call = static_cast<grpc_subchannel_call*>(
- gpr_arena_alloc(args.arena, allocation_size));
+ *call = new (gpr_arena_alloc(args.arena, allocation_size))
+ grpc_subchannel_call(this, args);
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
RefCountedPtr<ConnectedSubchannel> connection =
Ref(DEBUG_LOCATION, "subchannel_call");
connection.release(); // Ref is passed to the grpc_subchannel_call object.
- (*call)->connection = this;
- (*call)->deadline = args.deadline;
const grpc_call_element_args call_args = {
callstk, /* call_stack */
nullptr, /* server_transport_data */
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index d23ad67ad5..b4cb07f0f9 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -27,6 +27,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel_init.h"
@@ -152,7 +153,11 @@ static void inject_recv_trailing_metadata_ready(
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
- bool in_call_combiner;
+ start_timer_after_init_state(grpc_call_element* elem, grpc_millis deadline)
+ : elem(elem), deadline(deadline) {}
+ ~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); }
+
+ bool in_call_combiner = false;
grpc_call_element* elem;
grpc_millis deadline;
grpc_closure closure;
@@ -171,20 +176,16 @@ static void start_timer_after_init(void* arg, grpc_error* error) {
"scheduling deadline timer");
return;
}
- start_timer_if_needed(state->elem, state->deadline);
- gpr_free(state);
+ grpc_core::Delete(state);
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"done scheduling deadline timer");
}
-void grpc_deadline_state_init(grpc_call_element* elem,
- grpc_call_stack* call_stack,
- grpc_call_combiner* call_combiner,
- grpc_millis deadline) {
- grpc_deadline_state* deadline_state =
- static_cast<grpc_deadline_state*>(elem->call_data);
- deadline_state->call_stack = call_stack;
- deadline_state->call_combiner = call_combiner;
+grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
+ grpc_call_stack* call_stack,
+ grpc_call_combiner* call_combiner,
+ grpc_millis deadline)
+ : call_stack(call_stack), call_combiner(call_combiner) {
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
if (deadline != GRPC_MILLIS_INF_FUTURE) {
@@ -196,21 +197,14 @@ void grpc_deadline_state_init(grpc_call_element* elem,
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
struct start_timer_after_init_state* state =
- static_cast<struct start_timer_after_init_state*>(
- gpr_zalloc(sizeof(*state)));
- state->elem = elem;
- state->deadline = deadline;
+ grpc_core::New<start_timer_after_init_state>(elem, deadline);
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE);
}
}
-void grpc_deadline_state_destroy(grpc_call_element* elem) {
- grpc_deadline_state* deadline_state =
- static_cast<grpc_deadline_state*>(elem->call_data);
- cancel_timer_if_needed(deadline_state);
-}
+grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
void grpc_deadline_state_reset(grpc_call_element* elem,
grpc_millis new_deadline) {
@@ -269,8 +263,8 @@ typedef struct server_call_data {
// Constructor for call_data. Used for both client and server filters.
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
- grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
- args->deadline);
+ new (elem->call_data) grpc_deadline_state(
+ elem, args->call_stack, args->call_combiner, args->deadline);
return GRPC_ERROR_NONE;
}
@@ -278,7 +272,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
- grpc_deadline_state_destroy(elem);
+ grpc_deadline_state* deadline_state =
+ static_cast<grpc_deadline_state*>(elem->call_data);
+ deadline_state->~grpc_deadline_state();
}
// Method for starting a call op for client filter.
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 1d797f445a..e37032999c 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -22,19 +22,23 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"
-typedef enum grpc_deadline_timer_state {
+enum grpc_deadline_timer_state {
GRPC_DEADLINE_STATE_INITIAL,
GRPC_DEADLINE_STATE_PENDING,
GRPC_DEADLINE_STATE_FINISHED
-} grpc_deadline_timer_state;
+};
// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
-typedef struct grpc_deadline_state {
+struct grpc_deadline_state {
+ grpc_deadline_state(grpc_call_element* elem, grpc_call_stack* call_stack,
+ grpc_call_combiner* call_combiner, grpc_millis deadline);
+ ~grpc_deadline_state();
+
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
grpc_call_combiner* call_combiner;
- grpc_deadline_timer_state timer_state;
+ grpc_deadline_timer_state timer_state = GRPC_DEADLINE_STATE_INITIAL;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when we receive trailing metadata.
@@ -43,21 +47,13 @@ typedef struct grpc_deadline_state {
// The original recv_trailing_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* original_recv_trailing_metadata_ready;
-} grpc_deadline_state;
+};
//
// NOTE: All of these functions require that the first field in
// elem->call_data is a grpc_deadline_state.
//
-// assumes elem->call_data is zero'd
-void grpc_deadline_state_init(grpc_call_element* elem,
- grpc_call_stack* call_stack,
- grpc_call_combiner* call_combiner,
- grpc_millis deadline);
-
-void grpc_deadline_state_destroy(grpc_call_element* elem);
-
// Cancels the existing timer and starts a new one with new_deadline.
//
// Note: It is generally safe to call this with an earlier deadline
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index cd459e47cd..bf9a01f659 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -37,10 +37,31 @@
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
/* default maximum size of payload eligable for GET request */
-static const size_t kMaxPayloadSizeForGet = 2048;
+static constexpr size_t kMaxPayloadSizeForGet = 2048;
+
+static void recv_initial_metadata_ready(void* user_data, grpc_error* error);
+static void recv_trailing_metadata_ready(void* user_data, grpc_error* error);
+static void on_send_message_next_done(void* arg, grpc_error* error);
+static void send_message_on_complete(void* arg, grpc_error* error);
namespace {
struct call_data {
+ call_data(grpc_call_element* elem, const grpc_call_element_args& args)
+ : call_combiner(args.call_combiner) {
+ GRPC_CLOSURE_INIT(&recv_initial_metadata_ready,
+ ::recv_initial_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
+ ::recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&on_send_message_next_done, ::on_send_message_next_done,
+ elem, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&send_message_on_complete, ::send_message_on_complete,
+ elem, grpc_schedule_on_exec_ctx);
+ }
+
+ ~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); }
+
grpc_call_combiner* call_combiner;
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
@@ -51,18 +72,18 @@ struct call_data {
grpc_linked_mdelem user_agent;
// State for handling recv_initial_metadata ops.
grpc_metadata_batch* recv_initial_metadata;
- grpc_error* recv_initial_metadata_error;
- grpc_closure* original_recv_initial_metadata_ready;
+ grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
+ grpc_closure* original_recv_initial_metadata_ready = nullptr;
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* original_recv_trailing_metadata_ready;
grpc_closure recv_trailing_metadata_ready;
- grpc_error* recv_trailing_metadata_error;
- bool seen_recv_trailing_metadata_ready;
+ grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
+ bool seen_recv_trailing_metadata_ready = false;
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
- size_t send_message_bytes_read;
+ size_t send_message_bytes_read = 0;
grpc_core::ManualConstructor<grpc_core::ByteStreamCache> send_message_cache;
grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
send_message_caching_stream;
@@ -442,18 +463,7 @@ done:
/* Constructor for call_data */
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- calld->call_combiner = args->call_combiner;
- GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
- recv_initial_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
- recv_trailing_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
- elem, grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
- on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
+ new (elem->call_data) call_data(elem, *args);
return GRPC_ERROR_NONE;
}
@@ -462,7 +472,7 @@ static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
+ calld->~call_data();
}
static grpc_mdelem scheme_from_args(const grpc_channel_args* args) {
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
index 933fe3c77b..9c8c8d9e18 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
@@ -39,6 +39,10 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/static_metadata.h"
+static void start_send_message_batch(void* arg, grpc_error* unused);
+static void send_message_on_complete(void* arg, grpc_error* error);
+static void on_send_message_next_done(void* arg, grpc_error* error);
+
namespace {
enum initial_metadata_state {
// Initial metadata not yet seen.
@@ -50,6 +54,23 @@ enum initial_metadata_state {
};
struct call_data {
+ call_data(grpc_call_element* elem, const grpc_call_element_args& args)
+ : call_combiner(args.call_combiner) {
+ GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner,
+ start_send_message_batch, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_slice_buffer_init(&slices);
+ GRPC_CLOSURE_INIT(&send_message_on_complete, ::send_message_on_complete,
+ elem, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&on_send_message_next_done, ::on_send_message_next_done,
+ elem, grpc_schedule_on_exec_ctx);
+ }
+
+ ~call_data() {
+ grpc_slice_buffer_destroy_internal(&slices);
+ GRPC_ERROR_UNREF(cancel_error);
+ }
+
grpc_call_combiner* call_combiner;
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem stream_compression_algorithm_storage;
@@ -57,11 +78,12 @@ struct call_data {
grpc_linked_mdelem accept_stream_encoding_storage;
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
- grpc_message_compression_algorithm message_compression_algorithm;
- initial_metadata_state send_initial_metadata_state;
- grpc_error* cancel_error;
+ grpc_message_compression_algorithm message_compression_algorithm =
+ GRPC_MESSAGE_COMPRESS_NONE;
+ initial_metadata_state send_initial_metadata_state = INITIAL_METADATA_UNSEEN;
+ grpc_error* cancel_error = GRPC_ERROR_NONE;
grpc_closure start_send_message_batch_in_call_combiner;
- grpc_transport_stream_op_batch* send_message_batch;
+ grpc_transport_stream_op_batch* send_message_batch = nullptr;
grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream>
replacement_stream;
@@ -424,16 +446,7 @@ static void compress_start_transport_stream_op_batch(
/* Constructor for call_data */
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- calld->call_combiner = args->call_combiner;
- calld->cancel_error = GRPC_ERROR_NONE;
- grpc_slice_buffer_init(&calld->slices);
- GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner,
- start_send_message_batch, elem, grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
- on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
- elem, grpc_schedule_on_exec_ctx);
+ new (elem->call_data) call_data(elem, *args);
return GRPC_ERROR_NONE;
}
@@ -442,8 +455,7 @@ static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- grpc_slice_buffer_destroy_internal(&calld->slices);
- GRPC_ERROR_UNREF(calld->cancel_error);
+ calld->~call_data();
}
/* Constructor for channel_data */
diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc
index 436ea09d94..ce1be8370c 100644
--- a/src/core/ext/filters/http/server/http_server_filter.cc
+++ b/src/core/ext/filters/http/server/http_server_filter.cc
@@ -35,9 +35,32 @@
#define EXPECTED_CONTENT_TYPE "application/grpc"
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
+static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err);
+static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err);
+static void hs_recv_message_ready(void* user_data, grpc_error* err);
+
namespace {
struct call_data {
+ call_data(grpc_call_element* elem, const grpc_call_element_args& args)
+ : call_combiner(args.call_combiner) {
+ GRPC_CLOSURE_INIT(&recv_initial_metadata_ready,
+ hs_recv_initial_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&recv_message_ready, hs_recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
+ hs_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ }
+
+ ~call_data() {
+ GRPC_ERROR_UNREF(recv_initial_metadata_ready_error);
+ if (have_read_stream) {
+ read_stream->Orphan();
+ }
+ }
+
grpc_call_combiner* call_combiner;
// Outgoing headers to add to send_initial_metadata.
@@ -47,27 +70,27 @@ struct call_data {
// If we see the recv_message contents in the GET query string, we
// store it here.
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> read_stream;
- bool have_read_stream;
+ bool have_read_stream = false;
// State for intercepting recv_initial_metadata.
grpc_closure recv_initial_metadata_ready;
- grpc_error* recv_initial_metadata_ready_error;
+ grpc_error* recv_initial_metadata_ready_error = GRPC_ERROR_NONE;
grpc_closure* original_recv_initial_metadata_ready;
- grpc_metadata_batch* recv_initial_metadata;
+ grpc_metadata_batch* recv_initial_metadata = nullptr;
uint32_t* recv_initial_metadata_flags;
- bool seen_recv_initial_metadata_ready;
+ bool seen_recv_initial_metadata_ready = false;
// State for intercepting recv_message.
grpc_closure* original_recv_message_ready;
grpc_closure recv_message_ready;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
- bool seen_recv_message_ready;
+ bool seen_recv_message_ready = false;
// State for intercepting recv_trailing_metadata
grpc_closure recv_trailing_metadata_ready;
grpc_closure* original_recv_trailing_metadata_ready;
grpc_error* recv_trailing_metadata_ready_error;
- bool seen_recv_trailing_metadata_ready;
+ bool seen_recv_trailing_metadata_ready = false;
};
struct channel_data {
@@ -431,16 +454,7 @@ static void hs_start_transport_stream_op_batch(
/* Constructor for call_data */
static grpc_error* hs_init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- calld->call_combiner = args->call_combiner;
- GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
- hs_recv_initial_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
- hs_recv_trailing_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
+ new (elem->call_data) call_data(elem, *args);
return GRPC_ERROR_NONE;
}
@@ -449,10 +463,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error);
- if (calld->have_read_stream) {
- calld->read_stream->Orphan();
- }
+ calld->~call_data();
}
/* Constructor for channel_data */
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index 2d3b16d992..94d6942aa4 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -90,9 +90,53 @@ RefCountedPtr<MessageSizeLimits> MessageSizeLimits::CreateFromJson(
} // namespace
} // namespace grpc_core
+static void recv_message_ready(void* user_data, grpc_error* error);
+static void recv_trailing_metadata_ready(void* user_data, grpc_error* error);
+
namespace {
+struct channel_data {
+ message_size_limits limits;
+ // Maps path names to refcounted_message_size_limits structs.
+ grpc_core::RefCountedPtr<grpc_core::SliceHashTable<
+ grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits>>>
+ method_limit_table;
+};
+
struct call_data {
+ call_data(grpc_call_element* elem, const channel_data& chand,
+ const grpc_call_element_args& args)
+ : call_combiner(args.call_combiner), limits(chand.limits) {
+ GRPC_CLOSURE_INIT(&recv_message_ready, ::recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
+ ::recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ // Get max sizes from channel data, then merge in per-method config values.
+ // Note: Per-method config is only available on the client, so we
+ // apply the max request size to the send limit and the max response
+ // size to the receive limit.
+ if (chand.method_limit_table != nullptr) {
+ grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits> limits =
+ grpc_core::ServiceConfig::MethodConfigTableLookup(
+ *chand.method_limit_table, args.path);
+ if (limits != nullptr) {
+ if (limits->limits().max_send_size >= 0 &&
+ (limits->limits().max_send_size < this->limits.max_send_size ||
+ this->limits.max_send_size < 0)) {
+ this->limits.max_send_size = limits->limits().max_send_size;
+ }
+ if (limits->limits().max_recv_size >= 0 &&
+ (limits->limits().max_recv_size < this->limits.max_recv_size ||
+ this->limits.max_recv_size < 0)) {
+ this->limits.max_recv_size = limits->limits().max_recv_size;
+ }
+ }
+ }
+ }
+
+ ~call_data() { GRPC_ERROR_UNREF(error); }
+
grpc_call_combiner* call_combiner;
message_size_limits limits;
// Receive closures are chained: we inject this closure as the
@@ -101,25 +145,17 @@ struct call_data {
grpc_closure recv_message_ready;
grpc_closure recv_trailing_metadata_ready;
// The error caused by a message that is too large, or GRPC_ERROR_NONE
- grpc_error* error;
+ grpc_error* error = GRPC_ERROR_NONE;
// Used by recv_message_ready.
- grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message = nullptr;
// Original recv_message_ready callback, invoked after our own.
- grpc_closure* next_recv_message_ready;
+ grpc_closure* next_recv_message_ready = nullptr;
// Original recv_trailing_metadata callback, invoked after our own.
grpc_closure* original_recv_trailing_metadata_ready;
- bool seen_recv_trailing_metadata;
+ bool seen_recv_trailing_metadata = false;
grpc_error* recv_trailing_metadata_error;
};
-struct channel_data {
- message_size_limits limits;
- // Maps path names to refcounted_message_size_limits structs.
- grpc_core::RefCountedPtr<grpc_core::SliceHashTable<
- grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits>>>
- method_limit_table;
-};
-
} // namespace
// Callback invoked when we receive a message. Here we check the max
@@ -228,38 +264,7 @@ static void start_transport_stream_op_batch(
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- calld->call_combiner = args->call_combiner;
- calld->next_recv_message_ready = nullptr;
- calld->original_recv_trailing_metadata_ready = nullptr;
- calld->error = GRPC_ERROR_NONE;
- GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
- recv_trailing_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
- // Get max sizes from channel data, then merge in per-method config values.
- // Note: Per-method config is only available on the client, so we
- // apply the max request size to the send limit and the max response
- // size to the receive limit.
- calld->limits = chand->limits;
- if (chand->method_limit_table != nullptr) {
- grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits> limits =
- grpc_core::ServiceConfig::MethodConfigTableLookup(
- *chand->method_limit_table, args->path);
- if (limits != nullptr) {
- if (limits->limits().max_send_size >= 0 &&
- (limits->limits().max_send_size < calld->limits.max_send_size ||
- calld->limits.max_send_size < 0)) {
- calld->limits.max_send_size = limits->limits().max_send_size;
- }
- if (limits->limits().max_recv_size >= 0 &&
- (limits->limits().max_recv_size < calld->limits.max_recv_size ||
- calld->limits.max_recv_size < 0)) {
- calld->limits.max_recv_size = limits->limits().max_recv_size;
- }
- }
- }
+ new (elem->call_data) call_data(elem, *chand, *args);
return GRPC_ERROR_NONE;
}
@@ -268,7 +273,7 @@ static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = (call_data*)elem->call_data;
- GRPC_ERROR_UNREF(calld->error);
+ calld->~call_data();
}
static int default_size(const grpc_channel_args* args,