diff options
author | Soheil Hassas Yeganeh <soheil@google.com> | 2018-11-02 15:20:02 -0400 |
---|---|---|
committer | Soheil Hassas Yeganeh <soheil@google.com> | 2018-11-05 10:12:39 -0500 |
commit | 48e4a81b05f2ad6541d72e819cd4f638055f13d5 (patch) | |
tree | 8d5b64b7113721afb2eb4a0363cbd3fd7e47ff41 /src/core/ext/filters | |
parent | 5e6c4491bf60aa91bd3e4fed3c8203601a4c795e (diff) |
Remeve memset(0) from arena allocated memory.
Callers are updated to properly initialize the memory.
This behavior can be overridden using GRPC_ARENA_INIT_STRATEGY
environment variable.
Diffstat (limited to 'src/core/ext/filters')
12 files changed, 352 insertions, 280 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 58db7a05b4..d42961fc60 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -938,12 +938,26 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { // (census filter is on top of this one) // - add census stats for retries +namespace { +struct call_data; + // State used for starting a retryable batch on a subchannel call. // This provides its own grpc_transport_stream_op_batch and other data // structures needed to populate the ops in the batch. // We allocate one struct on the arena for each attempt at starting a // batch on a given subchannel call. -typedef struct { +struct subchannel_batch_data { + subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, + bool set_on_complete); + // All dtor code must be added in `destroy`. This is because we may + // call closures in `subchannel_batch_data` after they are unrefed by + // `batch_data_unref`, and msan would complain about accessing this class + // after calling dtor. As a result we cannot call the `dtor` in + // `batch_data_unref`. + // TODO(soheil): We should try to call the dtor in `batch_data_unref`. + ~subchannel_batch_data() { destroy(); } + void destroy(); + gpr_refcount refs; grpc_call_element* elem; grpc_subchannel_call* subchannel_call; // Holds a ref. @@ -952,11 +966,23 @@ typedef struct { grpc_transport_stream_op_batch batch; // For intercepting on_complete. grpc_closure on_complete; -} subchannel_batch_data; +}; // Retry state associated with a subchannel call. // Stored in the parent_data of the subchannel call object. -typedef struct { +struct subchannel_call_retry_state { + explicit subchannel_call_retry_state(grpc_call_context_element* context) + : batch_payload(context), + started_send_initial_metadata(false), + completed_send_initial_metadata(false), + started_send_trailing_metadata(false), + completed_send_trailing_metadata(false), + started_recv_initial_metadata(false), + completed_recv_initial_metadata(false), + started_recv_trailing_metadata(false), + completed_recv_trailing_metadata(false), + retry_dispatched(false) {} + // subchannel_batch_data.batch.payload points to this. grpc_transport_stream_op_batch_payload batch_payload; // For send_initial_metadata. @@ -975,7 +1001,7 @@ typedef struct { // For intercepting recv_initial_metadata. grpc_metadata_batch recv_initial_metadata; grpc_closure recv_initial_metadata_ready; - bool trailing_metadata_available; + bool trailing_metadata_available = false; // For intercepting recv_message. grpc_closure recv_message_ready; grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message; @@ -985,10 +1011,10 @@ typedef struct { grpc_closure recv_trailing_metadata_ready; // These fields indicate which ops have been started and completed on // this subchannel call. - size_t started_send_message_count; - size_t completed_send_message_count; - size_t started_recv_message_count; - size_t completed_recv_message_count; + size_t started_send_message_count = 0; + size_t completed_send_message_count = 0; + size_t started_recv_message_count = 0; + size_t completed_recv_message_count = 0; bool started_send_initial_metadata : 1; bool completed_send_initial_metadata : 1; bool started_send_trailing_metadata : 1; @@ -999,12 +1025,12 @@ typedef struct { bool completed_recv_trailing_metadata : 1; // State for callback processing. bool retry_dispatched : 1; - subchannel_batch_data* recv_initial_metadata_ready_deferred_batch; - grpc_error* recv_initial_metadata_error; - subchannel_batch_data* recv_message_ready_deferred_batch; - grpc_error* recv_message_error; - subchannel_batch_data* recv_trailing_metadata_internal_batch; -} subchannel_call_retry_state; + subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr; + grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; + subchannel_batch_data* recv_message_ready_deferred_batch = nullptr; + grpc_error* recv_message_error = GRPC_ERROR_NONE; + subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr; +}; // Pending batches stored in call data. typedef struct { @@ -1019,7 +1045,44 @@ typedef struct { Handles queueing of stream ops until a call object is ready, waiting for initial metadata before trying to create a call object, and handling cancellation gracefully. */ -typedef struct client_channel_call_data { +struct call_data { + call_data(grpc_call_element* elem, const channel_data& chand, + const grpc_call_element_args& args) + : deadline_state(elem, args.call_stack, args.call_combiner, + GPR_LIKELY(chand.deadline_checking_enabled) + ? args.deadline + : GRPC_MILLIS_INF_FUTURE), + path(grpc_slice_ref_internal(args.path)), + call_start_time(args.start_time), + deadline(args.deadline), + arena(args.arena), + owning_call(args.call_stack), + call_combiner(args.call_combiner), + pending_send_initial_metadata(false), + pending_send_message(false), + pending_send_trailing_metadata(false), + enable_retries(chand.enable_retries), + retry_committed(false), + last_attempt_got_server_pushback(false) {} + + ~call_data() { + if (GPR_LIKELY(subchannel_call != nullptr)) { + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, + "client_channel_destroy_call"); + } + grpc_slice_unref_internal(path); + GRPC_ERROR_UNREF(cancel_error); + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { + GPR_ASSERT(pending_batches[i].batch == nullptr); + } + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { + if (pick.subchannel_call_context[i].value != nullptr) { + pick.subchannel_call_context[i].destroy( + pick.subchannel_call_context[i].value); + } + } + } + // State for handling deadlines. // The code in deadline_filter.c requires this to be the first field. // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state @@ -1038,24 +1101,24 @@ typedef struct client_channel_call_data { grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params; - grpc_subchannel_call* subchannel_call; + grpc_subchannel_call* subchannel_call = nullptr; // Set when we get a cancel_stream op. - grpc_error* cancel_error; + grpc_error* cancel_error = GRPC_ERROR_NONE; grpc_core::LoadBalancingPolicy::PickState pick; grpc_closure pick_closure; grpc_closure pick_cancel_closure; - grpc_polling_entity* pollent; - bool pollent_added_to_interested_parties; + grpc_polling_entity* pollent = nullptr; + bool pollent_added_to_interested_parties = false; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when // either we have invoked all of the batch's callbacks or we have // passed the batch down to the subchannel call and are not // intercepting any of its callbacks). - pending_batch pending_batches[MAX_PENDING_BATCHES]; + pending_batch pending_batches[MAX_PENDING_BATCHES] = {}; bool pending_send_initial_metadata : 1; bool pending_send_message : 1; bool pending_send_trailing_metadata : 1; @@ -1064,8 +1127,8 @@ typedef struct client_channel_call_data { bool enable_retries : 1; bool retry_committed : 1; bool last_attempt_got_server_pushback : 1; - int num_attempts_completed; - size_t bytes_buffered_for_retry; + int num_attempts_completed = 0; + size_t bytes_buffered_for_retry = 0; grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff; grpc_timer retry_timer; @@ -1076,12 +1139,12 @@ typedef struct client_channel_call_data { // until all of these batches have completed. // Note that we actually only need to track replay batches, but it's // easier to track all batches with send ops. - int num_pending_retriable_subchannel_send_batches; + int num_pending_retriable_subchannel_send_batches = 0; // Cached data for retrying send ops. // send_initial_metadata - bool seen_send_initial_metadata; - grpc_linked_mdelem* send_initial_metadata_storage; + bool seen_send_initial_metadata = false; + grpc_linked_mdelem* send_initial_metadata_storage = nullptr; grpc_metadata_batch send_initial_metadata; uint32_t send_initial_metadata_flags; gpr_atm* peer_string; @@ -1092,14 +1155,13 @@ typedef struct client_channel_call_data { // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. - grpc_core::ManualConstructor< - grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>> - send_messages; + grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages; // send_trailing_metadata - bool seen_send_trailing_metadata; - grpc_linked_mdelem* send_trailing_metadata_storage; + bool seen_send_trailing_metadata = false; + grpc_linked_mdelem* send_trailing_metadata_storage = nullptr; grpc_metadata_batch send_trailing_metadata; -} call_data; +}; +} // namespace // Forward declarations. static void retry_commit(grpc_call_element* elem, @@ -1143,7 +1205,7 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); new (cache) grpc_core::ByteStreamCache( std::move(batch->payload->send_message.send_message)); - calld->send_messages->push_back(cache); + calld->send_messages.push_back(cache); } // Save metadata batch for send_trailing_metadata ops. if (batch->send_trailing_metadata) { @@ -1180,7 +1242,7 @@ static void free_cached_send_message(channel_data* chand, call_data* calld, "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]", chand, calld, idx); } - (*calld->send_messages)[idx]->Destroy(); + calld->send_messages[idx]->Destroy(); } // Frees cached send_trailing_metadata. @@ -1650,55 +1712,66 @@ static bool maybe_retry(grpc_call_element* elem, // subchannel_batch_data // -// Creates a subchannel_batch_data object on the call's arena with the -// specified refcount. If set_on_complete is true, the batch's -// on_complete callback will be set to point to on_complete(); -// otherwise, the batch's on_complete callback will be null. -static subchannel_batch_data* batch_data_create(grpc_call_element* elem, - int refcount, - bool set_on_complete) { - call_data* calld = static_cast<call_data*>(elem->call_data); +namespace { +subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, + call_data* calld, int refcount, + bool set_on_complete) + : elem(elem), + subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, + "batch_data_create")) { subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); - subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>( - gpr_arena_alloc(calld->arena, sizeof(*batch_data))); - batch_data->elem = elem; - batch_data->subchannel_call = - GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); - batch_data->batch.payload = &retry_state->batch_payload; - gpr_ref_init(&batch_data->refs, refcount); + batch.payload = &retry_state->batch_payload; + gpr_ref_init(&refs, refcount); if (set_on_complete) { - GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, + GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this, grpc_schedule_on_exec_ctx); - batch_data->batch.on_complete = &batch_data->on_complete; + batch.on_complete = &on_complete; } GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); +} + +void subchannel_batch_data::destroy() { + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data(subchannel_call)); + if (batch.send_initial_metadata) { + grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); + } + if (batch.send_trailing_metadata) { + grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); + } + if (batch.recv_initial_metadata) { + grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); + } + if (batch.recv_trailing_metadata) { + grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); + } + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref"); + call_data* calld = static_cast<call_data*>(elem->call_data); + GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); +} +} // namespace + +// Creates a subchannel_batch_data object on the call's arena with the +// specified refcount. If set_on_complete is true, the batch's +// on_complete callback will be set to point to on_complete(); +// otherwise, the batch's on_complete callback will be null. +static subchannel_batch_data* batch_data_create(grpc_call_element* elem, + int refcount, + bool set_on_complete) { + call_data* calld = static_cast<call_data*>(elem->call_data); + subchannel_batch_data* batch_data = + new (gpr_arena_alloc(calld->arena, sizeof(*batch_data))) + subchannel_batch_data(elem, calld, refcount, set_on_complete); return batch_data; } static void batch_data_unref(subchannel_batch_data* batch_data) { if (gpr_unref(&batch_data->refs)) { - subchannel_call_retry_state* retry_state = - static_cast<subchannel_call_retry_state*>( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); - if (batch_data->batch.send_initial_metadata) { - grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); - } - if (batch_data->batch.send_trailing_metadata) { - grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); - } - if (batch_data->batch.recv_initial_metadata) { - grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); - } - if (batch_data->batch.recv_trailing_metadata) { - grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); - } - GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref"); - call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); - GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); + batch_data->destroy(); } } @@ -1996,7 +2069,7 @@ static bool pending_batch_is_unstarted( return true; } if (pending->batch->send_message && - retry_state->started_send_message_count < calld->send_messages->size()) { + retry_state->started_send_message_count < calld->send_messages.size()) { return true; } if (pending->batch->send_trailing_metadata && @@ -2152,7 +2225,7 @@ static void add_closures_for_replay_or_pending_send_ops( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); bool have_pending_send_message_ops = - retry_state->started_send_message_count < calld->send_messages->size(); + retry_state->started_send_message_count < calld->send_messages.size(); bool have_pending_send_trailing_metadata_op = calld->seen_send_trailing_metadata && !retry_state->started_send_trailing_metadata; @@ -2344,7 +2417,7 @@ static void add_retriable_send_message_op( chand, calld, retry_state->started_send_message_count); } grpc_core::ByteStreamCache* cache = - (*calld->send_messages)[retry_state->started_send_message_count]; + calld->send_messages[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; retry_state->send_message.Init(cache); batch_data->batch.send_message = true; @@ -2476,7 +2549,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( } // send_message. // Note that we can only have one send_message op in flight at a time. - if (retry_state->started_send_message_count < calld->send_messages->size() && + if (retry_state->started_send_message_count < calld->send_messages.size() && retry_state->started_send_message_count == retry_state->completed_send_message_count && !calld->pending_send_message) { @@ -2497,7 +2570,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // to start, since we can't send down any more send_message ops after // send_trailing_metadata. if (calld->seen_send_trailing_metadata && - retry_state->started_send_message_count == calld->send_messages->size() && + retry_state->started_send_message_count == calld->send_messages.size() && !retry_state->started_send_trailing_metadata && !calld->pending_send_trailing_metadata) { if (grpc_client_channel_trace.enabled()) { @@ -2549,7 +2622,7 @@ static void add_subchannel_batches_for_pending_batches( // send_message ops after send_trailing_metadata. if (batch->send_trailing_metadata && (retry_state->started_send_message_count + batch->send_message < - calld->send_messages->size() || + calld->send_messages.size() || retry_state->started_send_trailing_metadata)) { continue; } @@ -2716,11 +2789,9 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { if (parent_data_size > 0) { - subchannel_call_retry_state* retry_state = - static_cast<subchannel_call_retry_state*>( - grpc_connected_subchannel_call_get_parent_data( - calld->subchannel_call)); - retry_state->batch_payload.context = calld->pick.subchannel_call_context; + new (grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)) + subchannel_call_retry_state(calld->pick.subchannel_call_context); } pending_batches_resume(elem); } @@ -3260,21 +3331,8 @@ static void cc_start_transport_stream_op_batch( /* Constructor for call_data */ static grpc_error* cc_init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data); - // Initialize data members. - calld->path = grpc_slice_ref_internal(args->path); - calld->call_start_time = args->start_time; - calld->deadline = args->deadline; - calld->arena = args->arena; - calld->owning_call = args->call_stack; - calld->call_combiner = args->call_combiner; - if (GPR_LIKELY(chand->deadline_checking_enabled)) { - grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, - calld->deadline); - } - calld->enable_retries = chand->enable_retries; - calld->send_messages.Init(); + new (elem->call_data) call_data(elem, *chand, *args); return GRPC_ERROR_NONE; } @@ -3283,34 +3341,12 @@ static void cc_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure) { call_data* calld = static_cast<call_data*>(elem->call_data); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - if (GPR_LIKELY(chand->deadline_checking_enabled)) { - grpc_deadline_state_destroy(elem); - } - grpc_slice_unref_internal(calld->path); - calld->retry_throttle_data.reset(); - calld->method_params.reset(); - GRPC_ERROR_UNREF(calld->cancel_error); if (GPR_LIKELY(calld->subchannel_call != nullptr)) { grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, then_schedule_closure); then_schedule_closure = nullptr; - GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, - "client_channel_destroy_call"); - } - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - GPR_ASSERT(calld->pending_batches[i].batch == nullptr); - } - if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) { - calld->pick.connected_subchannel.reset(); - } - for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { - if (calld->pick.subchannel_call_context[i].value != nullptr) { - calld->pick.subchannel_call_context[i].destroy( - calld->pick.subchannel_call_context[i].value); - } } - calld->send_messages.Destroy(); + calld->~call_data(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 591637aa86..587919596f 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -286,10 +286,9 @@ HealthCheckClient::CallState::CallState( health_check_client_(std::move(health_check_client)), pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)), arena_(gpr_arena_create(health_check_client_->connected_subchannel_ - ->GetInitialCallSizeEstimate(0))) { - memset(&call_combiner_, 0, sizeof(call_combiner_)); + ->GetInitialCallSizeEstimate(0))), + payload_(context_) { grpc_call_combiner_init(&call_combiner_); - memset(context_, 0, sizeof(context_)); gpr_atm_rel_store(&seen_response_, static_cast<gpr_atm>(0)); } diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h index 7f77348f18..f6babef7d6 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.h +++ b/src/core/ext/filters/client_channel/health/health_check_client.h @@ -97,7 +97,7 @@ class HealthCheckClient gpr_arena* arena_; grpc_call_combiner call_combiner_; - grpc_call_context_element context_[GRPC_CONTEXT_COUNT]; + grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; // The streaming call to the backend. Always non-NULL. grpc_subchannel_call* call_; diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 21f80b7b94..b0040457a6 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -63,29 +63,29 @@ class LoadBalancingPolicy /// State used for an LB pick. struct PickState { /// Initial metadata associated with the picking call. - grpc_metadata_batch* initial_metadata; + grpc_metadata_batch* initial_metadata = nullptr; /// Bitmask used for selective cancelling. See /// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in /// grpc_types.h. - uint32_t initial_metadata_flags; + uint32_t initial_metadata_flags = 0; /// Storage for LB token in \a initial_metadata, or nullptr if not used. grpc_linked_mdelem lb_token_mdelem_storage; /// Closure to run when pick is complete, if not completed synchronously. /// If null, pick will fail if a result is not available synchronously. - grpc_closure* on_complete; + grpc_closure* on_complete = nullptr; /// Will be set to the selected subchannel, or nullptr on failure or when /// the LB policy decides to drop the call. RefCountedPtr<ConnectedSubchannel> connected_subchannel; /// Will be populated with context to pass to the subchannel call, if /// needed. - grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; + grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT] = {}; /// Upon success, \a *user_data will be set to whatever opaque information /// may need to be propagated from the LB policy, or nullptr if not needed. // TODO(roth): As part of revamping our metadata APIs, try to find a // way to clean this up and C++-ify it. - void** user_data; + void** user_data = nullptr; /// Next pointer. For internal use by LB policy. - PickState* next; + PickState* next = nullptr; }; // Not copyable nor movable. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index cc259bcdbf..399bb452f4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -37,16 +37,27 @@ static void destroy_channel_elem(grpc_channel_element* elem) {} namespace { struct call_data { + call_data(const grpc_call_element_args& args) { + if (args.context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) { + // Get stats object from context and take a ref. + client_stats = static_cast<grpc_core::GrpcLbClientStats*>( + args.context[GRPC_GRPCLB_CLIENT_STATS].value) + ->Ref(); + // Record call started. + client_stats->AddCallStarted(); + } + } + // Stats object to update. grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats; // State for intercepting send_initial_metadata. grpc_closure on_complete_for_send; grpc_closure* original_on_complete_for_send; - bool send_initial_metadata_succeeded; + bool send_initial_metadata_succeeded = false; // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; grpc_closure* original_recv_initial_metadata_ready; - bool recv_initial_metadata_succeeded; + bool recv_initial_metadata_succeeded = false; }; } // namespace @@ -70,16 +81,8 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); - // Get stats object from context and take a ref. GPR_ASSERT(args->context != nullptr); - if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) { - calld->client_stats = static_cast<grpc_core::GrpcLbClientStats*>( - args->context[GRPC_GRPCLB_CLIENT_STATS].value) - ->Ref(); - // Record call started. - calld->client_stats->AddCallStarted(); - } + new (elem->call_data) call_data(*args); return GRPC_ERROR_NONE; } @@ -97,6 +100,7 @@ static void destroy_call_elem(grpc_call_element* elem, // TODO(roth): Eliminate this once filter stack is converted to C++. calld->client_stats.reset(); } + calld->~call_data(); } static void start_transport_stream_op_batch( diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 340fa0c464..a56db0201b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -162,12 +162,16 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { + grpc_subchannel_call(grpc_core::ConnectedSubchannel* connection, + const grpc_core::ConnectedSubchannel::CallArgs& args) + : connection(connection), deadline(args.deadline) {} + grpc_core::ConnectedSubchannel* connection; - grpc_closure* schedule_closure_after_destroy; + grpc_closure* schedule_closure_after_destroy = nullptr; // state needed to support channelz interception of recv trailing metadata. grpc_closure recv_trailing_metadata_ready; grpc_closure* original_recv_trailing_metadata; - grpc_metadata_batch* recv_trailing_metadata; + grpc_metadata_batch* recv_trailing_metadata = nullptr; grpc_millis deadline; }; @@ -905,6 +909,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); connection->Unref(DEBUG_LOCATION, "subchannel_call"); + c->~grpc_subchannel_call(); } void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, @@ -1102,14 +1107,12 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, grpc_subchannel_call** call) { const size_t allocation_size = GetInitialCallSizeEstimate(args.parent_data_size); - *call = static_cast<grpc_subchannel_call*>( - gpr_arena_alloc(args.arena, allocation_size)); + *call = new (gpr_arena_alloc(args.arena, allocation_size)) + grpc_subchannel_call(this, args); grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); RefCountedPtr<ConnectedSubchannel> connection = Ref(DEBUG_LOCATION, "subchannel_call"); connection.release(); // Ref is passed to the grpc_subchannel_call object. - (*call)->connection = this; - (*call)->deadline = args.deadline; const grpc_call_element_args call_args = { callstk, /* call_stack */ nullptr, /* server_transport_data */ diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index d23ad67ad5..b4cb07f0f9 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -27,6 +27,7 @@ #include <grpc/support/time.h> #include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel_init.h" @@ -152,7 +153,11 @@ static void inject_recv_trailing_metadata_ready( // Callback and associated state for starting the timer after call stack // initialization has been completed. struct start_timer_after_init_state { - bool in_call_combiner; + start_timer_after_init_state(grpc_call_element* elem, grpc_millis deadline) + : elem(elem), deadline(deadline) {} + ~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); } + + bool in_call_combiner = false; grpc_call_element* elem; grpc_millis deadline; grpc_closure closure; @@ -171,20 +176,16 @@ static void start_timer_after_init(void* arg, grpc_error* error) { "scheduling deadline timer"); return; } - start_timer_if_needed(state->elem, state->deadline); - gpr_free(state); + grpc_core::Delete(state); GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner, "done scheduling deadline timer"); } -void grpc_deadline_state_init(grpc_call_element* elem, - grpc_call_stack* call_stack, - grpc_call_combiner* call_combiner, - grpc_millis deadline) { - grpc_deadline_state* deadline_state = - static_cast<grpc_deadline_state*>(elem->call_data); - deadline_state->call_stack = call_stack; - deadline_state->call_combiner = call_combiner; +grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem, + grpc_call_stack* call_stack, + grpc_call_combiner* call_combiner, + grpc_millis deadline) + : call_stack(call_stack), call_combiner(call_combiner) { // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. if (deadline != GRPC_MILLIS_INF_FUTURE) { @@ -196,21 +197,14 @@ void grpc_deadline_state_init(grpc_call_element* elem, // create a closure to start the timer, and we schedule that closure // to be run after call stack initialization is done. struct start_timer_after_init_state* state = - static_cast<struct start_timer_after_init_state*>( - gpr_zalloc(sizeof(*state))); - state->elem = elem; - state->deadline = deadline; + grpc_core::New<start_timer_after_init_state>(elem, deadline); GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE); } } -void grpc_deadline_state_destroy(grpc_call_element* elem) { - grpc_deadline_state* deadline_state = - static_cast<grpc_deadline_state*>(elem->call_data); - cancel_timer_if_needed(deadline_state); -} +grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); } void grpc_deadline_state_reset(grpc_call_element* elem, grpc_millis new_deadline) { @@ -269,8 +263,8 @@ typedef struct server_call_data { // Constructor for call_data. Used for both client and server filters. static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, - args->deadline); + new (elem->call_data) grpc_deadline_state( + elem, args->call_stack, args->call_combiner, args->deadline); return GRPC_ERROR_NONE; } @@ -278,7 +272,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem, static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { - grpc_deadline_state_destroy(elem); + grpc_deadline_state* deadline_state = + static_cast<grpc_deadline_state*>(elem->call_data); + deadline_state->~grpc_deadline_state(); } // Method for starting a call op for client filter. diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h index 1d797f445a..e37032999c 100644 --- a/src/core/ext/filters/deadline/deadline_filter.h +++ b/src/core/ext/filters/deadline/deadline_filter.h @@ -22,19 +22,23 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/timer.h" -typedef enum grpc_deadline_timer_state { +enum grpc_deadline_timer_state { GRPC_DEADLINE_STATE_INITIAL, GRPC_DEADLINE_STATE_PENDING, GRPC_DEADLINE_STATE_FINISHED -} grpc_deadline_timer_state; +}; // State used for filters that enforce call deadlines. // Must be the first field in the filter's call_data. -typedef struct grpc_deadline_state { +struct grpc_deadline_state { + grpc_deadline_state(grpc_call_element* elem, grpc_call_stack* call_stack, + grpc_call_combiner* call_combiner, grpc_millis deadline); + ~grpc_deadline_state(); + // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; grpc_call_combiner* call_combiner; - grpc_deadline_timer_state timer_state; + grpc_deadline_timer_state timer_state = GRPC_DEADLINE_STATE_INITIAL; grpc_timer timer; grpc_closure timer_callback; // Closure to invoke when we receive trailing metadata. @@ -43,21 +47,13 @@ typedef struct grpc_deadline_state { // The original recv_trailing_metadata_ready closure, which we chain to // after our own closure is invoked. grpc_closure* original_recv_trailing_metadata_ready; -} grpc_deadline_state; +}; // // NOTE: All of these functions require that the first field in // elem->call_data is a grpc_deadline_state. // -// assumes elem->call_data is zero'd -void grpc_deadline_state_init(grpc_call_element* elem, - grpc_call_stack* call_stack, - grpc_call_combiner* call_combiner, - grpc_millis deadline); - -void grpc_deadline_state_destroy(grpc_call_element* elem); - // Cancels the existing timer and starts a new one with new_deadline. // // Note: It is generally safe to call this with an earlier deadline diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index cd459e47cd..bf9a01f659 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -37,10 +37,31 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 /* default maximum size of payload eligable for GET request */ -static const size_t kMaxPayloadSizeForGet = 2048; +static constexpr size_t kMaxPayloadSizeForGet = 2048; + +static void recv_initial_metadata_ready(void* user_data, grpc_error* error); +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error); +static void on_send_message_next_done(void* arg, grpc_error* error); +static void send_message_on_complete(void* arg, grpc_error* error); namespace { struct call_data { + call_data(grpc_call_element* elem, const grpc_call_element_args& args) + : call_combiner(args.call_combiner) { + GRPC_CLOSURE_INIT(&recv_initial_metadata_ready, + ::recv_initial_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready, + ::recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_send_message_next_done, ::on_send_message_next_done, + elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&send_message_on_complete, ::send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); + } + + ~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); } + grpc_call_combiner* call_combiner; // State for handling send_initial_metadata ops. grpc_linked_mdelem method; @@ -51,18 +72,18 @@ struct call_data { grpc_linked_mdelem user_agent; // State for handling recv_initial_metadata ops. grpc_metadata_batch* recv_initial_metadata; - grpc_error* recv_initial_metadata_error; - grpc_closure* original_recv_initial_metadata_ready; + grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; + grpc_closure* original_recv_initial_metadata_ready = nullptr; grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. grpc_metadata_batch* recv_trailing_metadata; grpc_closure* original_recv_trailing_metadata_ready; grpc_closure recv_trailing_metadata_ready; - grpc_error* recv_trailing_metadata_error; - bool seen_recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE; + bool seen_recv_trailing_metadata_ready = false; // State for handling send_message ops. grpc_transport_stream_op_batch* send_message_batch; - size_t send_message_bytes_read; + size_t send_message_bytes_read = 0; grpc_core::ManualConstructor<grpc_core::ByteStreamCache> send_message_cache; grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> send_message_caching_stream; @@ -442,18 +463,7 @@ done: /* Constructor for call_data */ static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); - calld->call_combiner = args->call_combiner; - GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, - elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, - on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); + new (elem->call_data) call_data(elem, *args); return GRPC_ERROR_NONE; } @@ -462,7 +472,7 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); - GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); + calld->~call_data(); } static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index 933fe3c77b..9c8c8d9e18 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -39,6 +39,10 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" +static void start_send_message_batch(void* arg, grpc_error* unused); +static void send_message_on_complete(void* arg, grpc_error* error); +static void on_send_message_next_done(void* arg, grpc_error* error); + namespace { enum initial_metadata_state { // Initial metadata not yet seen. @@ -50,6 +54,23 @@ enum initial_metadata_state { }; struct call_data { + call_data(grpc_call_element* elem, const grpc_call_element_args& args) + : call_combiner(args.call_combiner) { + GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner, + start_send_message_batch, elem, + grpc_schedule_on_exec_ctx); + grpc_slice_buffer_init(&slices); + GRPC_CLOSURE_INIT(&send_message_on_complete, ::send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_send_message_next_done, ::on_send_message_next_done, + elem, grpc_schedule_on_exec_ctx); + } + + ~call_data() { + grpc_slice_buffer_destroy_internal(&slices); + GRPC_ERROR_UNREF(cancel_error); + } + grpc_call_combiner* call_combiner; grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem stream_compression_algorithm_storage; @@ -57,11 +78,12 @@ struct call_data { grpc_linked_mdelem accept_stream_encoding_storage; /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ - grpc_message_compression_algorithm message_compression_algorithm; - initial_metadata_state send_initial_metadata_state; - grpc_error* cancel_error; + grpc_message_compression_algorithm message_compression_algorithm = + GRPC_MESSAGE_COMPRESS_NONE; + initial_metadata_state send_initial_metadata_state = INITIAL_METADATA_UNSEEN; + grpc_error* cancel_error = GRPC_ERROR_NONE; grpc_closure start_send_message_batch_in_call_combiner; - grpc_transport_stream_op_batch* send_message_batch; + grpc_transport_stream_op_batch* send_message_batch = nullptr; grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> replacement_stream; @@ -424,16 +446,7 @@ static void compress_start_transport_stream_op_batch( /* Constructor for call_data */ static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); - calld->call_combiner = args->call_combiner; - calld->cancel_error = GRPC_ERROR_NONE; - grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner, - start_send_message_batch, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, - on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, - elem, grpc_schedule_on_exec_ctx); + new (elem->call_data) call_data(elem, *args); return GRPC_ERROR_NONE; } @@ -442,8 +455,7 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); - grpc_slice_buffer_destroy_internal(&calld->slices); - GRPC_ERROR_UNREF(calld->cancel_error); + calld->~call_data(); } /* Constructor for channel_data */ diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 436ea09d94..ce1be8370c 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -35,9 +35,32 @@ #define EXPECTED_CONTENT_TYPE "application/grpc" #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 +static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err); +static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err); +static void hs_recv_message_ready(void* user_data, grpc_error* err); + namespace { struct call_data { + call_data(grpc_call_element* elem, const grpc_call_element_args& args) + : call_combiner(args.call_combiner) { + GRPC_CLOSURE_INIT(&recv_initial_metadata_ready, + hs_recv_initial_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&recv_message_ready, hs_recv_message_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready, + hs_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + } + + ~call_data() { + GRPC_ERROR_UNREF(recv_initial_metadata_ready_error); + if (have_read_stream) { + read_stream->Orphan(); + } + } + grpc_call_combiner* call_combiner; // Outgoing headers to add to send_initial_metadata. @@ -47,27 +70,27 @@ struct call_data { // If we see the recv_message contents in the GET query string, we // store it here. grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> read_stream; - bool have_read_stream; + bool have_read_stream = false; // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; - grpc_error* recv_initial_metadata_ready_error; + grpc_error* recv_initial_metadata_ready_error = GRPC_ERROR_NONE; grpc_closure* original_recv_initial_metadata_ready; - grpc_metadata_batch* recv_initial_metadata; + grpc_metadata_batch* recv_initial_metadata = nullptr; uint32_t* recv_initial_metadata_flags; - bool seen_recv_initial_metadata_ready; + bool seen_recv_initial_metadata_ready = false; // State for intercepting recv_message. grpc_closure* original_recv_message_ready; grpc_closure recv_message_ready; grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; - bool seen_recv_message_ready; + bool seen_recv_message_ready = false; // State for intercepting recv_trailing_metadata grpc_closure recv_trailing_metadata_ready; grpc_closure* original_recv_trailing_metadata_ready; grpc_error* recv_trailing_metadata_ready_error; - bool seen_recv_trailing_metadata_ready; + bool seen_recv_trailing_metadata_ready = false; }; struct channel_data { @@ -431,16 +454,7 @@ static void hs_start_transport_stream_op_batch( /* Constructor for call_data */ static grpc_error* hs_init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); - calld->call_combiner = args->call_combiner; - GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, - hs_recv_initial_metadata_ready, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, - hs_recv_trailing_metadata_ready, elem, - grpc_schedule_on_exec_ctx); + new (elem->call_data) call_data(elem, *args); return GRPC_ERROR_NONE; } @@ -449,10 +463,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); - GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error); - if (calld->have_read_stream) { - calld->read_stream->Orphan(); - } + calld->~call_data(); } /* Constructor for channel_data */ diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index 2d3b16d992..94d6942aa4 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -90,9 +90,53 @@ RefCountedPtr<MessageSizeLimits> MessageSizeLimits::CreateFromJson( } // namespace } // namespace grpc_core +static void recv_message_ready(void* user_data, grpc_error* error); +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error); + namespace { +struct channel_data { + message_size_limits limits; + // Maps path names to refcounted_message_size_limits structs. + grpc_core::RefCountedPtr<grpc_core::SliceHashTable< + grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits>>> + method_limit_table; +}; + struct call_data { + call_data(grpc_call_element* elem, const channel_data& chand, + const grpc_call_element_args& args) + : call_combiner(args.call_combiner), limits(chand.limits) { + GRPC_CLOSURE_INIT(&recv_message_ready, ::recv_message_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready, + ::recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + // Get max sizes from channel data, then merge in per-method config values. + // Note: Per-method config is only available on the client, so we + // apply the max request size to the send limit and the max response + // size to the receive limit. + if (chand.method_limit_table != nullptr) { + grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits> limits = + grpc_core::ServiceConfig::MethodConfigTableLookup( + *chand.method_limit_table, args.path); + if (limits != nullptr) { + if (limits->limits().max_send_size >= 0 && + (limits->limits().max_send_size < this->limits.max_send_size || + this->limits.max_send_size < 0)) { + this->limits.max_send_size = limits->limits().max_send_size; + } + if (limits->limits().max_recv_size >= 0 && + (limits->limits().max_recv_size < this->limits.max_recv_size || + this->limits.max_recv_size < 0)) { + this->limits.max_recv_size = limits->limits().max_recv_size; + } + } + } + } + + ~call_data() { GRPC_ERROR_UNREF(error); } + grpc_call_combiner* call_combiner; message_size_limits limits; // Receive closures are chained: we inject this closure as the @@ -101,25 +145,17 @@ struct call_data { grpc_closure recv_message_ready; grpc_closure recv_trailing_metadata_ready; // The error caused by a message that is too large, or GRPC_ERROR_NONE - grpc_error* error; + grpc_error* error = GRPC_ERROR_NONE; // Used by recv_message_ready. - grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message = nullptr; // Original recv_message_ready callback, invoked after our own. - grpc_closure* next_recv_message_ready; + grpc_closure* next_recv_message_ready = nullptr; // Original recv_trailing_metadata callback, invoked after our own. grpc_closure* original_recv_trailing_metadata_ready; - bool seen_recv_trailing_metadata; + bool seen_recv_trailing_metadata = false; grpc_error* recv_trailing_metadata_error; }; -struct channel_data { - message_size_limits limits; - // Maps path names to refcounted_message_size_limits structs. - grpc_core::RefCountedPtr<grpc_core::SliceHashTable< - grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits>>> - method_limit_table; -}; - } // namespace // Callback invoked when we receive a message. Here we check the max @@ -228,38 +264,7 @@ static void start_transport_stream_op_batch( static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - calld->call_combiner = args->call_combiner; - calld->next_recv_message_ready = nullptr; - calld->original_recv_trailing_metadata_ready = nullptr; - calld->error = GRPC_ERROR_NONE; - GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, elem, - grpc_schedule_on_exec_ctx); - // Get max sizes from channel data, then merge in per-method config values. - // Note: Per-method config is only available on the client, so we - // apply the max request size to the send limit and the max response - // size to the receive limit. - calld->limits = chand->limits; - if (chand->method_limit_table != nullptr) { - grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits> limits = - grpc_core::ServiceConfig::MethodConfigTableLookup( - *chand->method_limit_table, args->path); - if (limits != nullptr) { - if (limits->limits().max_send_size >= 0 && - (limits->limits().max_send_size < calld->limits.max_send_size || - calld->limits.max_send_size < 0)) { - calld->limits.max_send_size = limits->limits().max_send_size; - } - if (limits->limits().max_recv_size >= 0 && - (limits->limits().max_recv_size < calld->limits.max_recv_size || - calld->limits.max_recv_size < 0)) { - calld->limits.max_recv_size = limits->limits().max_recv_size; - } - } - } + new (elem->call_data) call_data(elem, *chand, *args); return GRPC_ERROR_NONE; } @@ -268,7 +273,7 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; - GRPC_ERROR_UNREF(calld->error); + calld->~call_data(); } static int default_size(const grpc_channel_args* args, |