diff options
author | 2018-06-11 16:33:15 -0700 | |
---|---|---|
committer | 2018-06-11 16:33:15 -0700 | |
commit | dddf20793269a7ba5f658ae5fdcde2c175cf196b (patch) | |
tree | 595d0fbc0c501132857b63f00f073ec6a0f54f58 /src/core | |
parent | 935ae7d01242c21f0230ead18927087a70c03e5b (diff) | |
parent | 37b292a0f7500e434c9ae41919248e2641ddc7f1 (diff) |
Merge branch 'master' into epollerr
Diffstat (limited to 'src/core')
39 files changed, 1422 insertions, 317 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 936d70b959..ea6775a8d8 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -891,6 +891,7 @@ typedef struct client_channel_call_data { grpc_closure pick_cancel_closure; grpc_polling_entity* pollent; + bool pollent_added_to_interested_parties; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when @@ -949,7 +950,6 @@ static void retry_commit(grpc_call_element* elem, static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); -static void pick_after_resolver_result_start_locked(grpc_call_element* elem); static void start_pick_locked(void* arg, grpc_error* ignored); // @@ -2684,59 +2684,133 @@ static void pick_done(void* arg, grpc_error* error) { } } +static void maybe_add_call_to_channel_interested_parties_locked( + grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (!calld->pollent_added_to_interested_parties) { + calld->pollent_added_to_interested_parties = true; + grpc_polling_entity_add_to_pollset_set(calld->pollent, + chand->interested_parties); + } +} + +static void maybe_del_call_from_channel_interested_parties_locked( + grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->pollent_added_to_interested_parties) { + calld->pollent_added_to_interested_parties = false; + grpc_polling_entity_del_from_pollset_set(calld->pollent, + chand->interested_parties); + } +} + // Invoked when a pick is completed to leave the client_channel combiner // and continue processing in the call combiner. +// If needed, removes the call's polling entity from chand->interested_parties. static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = static_cast<call_data*>(elem->call_data); + maybe_del_call_from_channel_interested_parties_locked(elem); GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_SCHED(&calld->pick_closure, error); } -// A wrapper around pick_done_locked() that is used in cases where -// either (a) the pick was deferred pending a resolver result or (b) the -// pick was done asynchronously. Removes the call's polling entity from -// chand->interested_parties before invoking pick_done_locked(). -static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - grpc_polling_entity_del_from_pollset_set(calld->pollent, - chand->interested_parties); - pick_done_locked(elem, error); -} +namespace grpc_core { -// Note: This runs under the client_channel combiner, but will NOT be -// holding the call combiner. -static void pick_callback_cancel_locked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - // Note: chand->lb_policy may have changed since we started our pick, - // in which case we will be cancelling the pick on a policy other than - // the one we started it on. However, this will just be a no-op. - if (GPR_LIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { +// Performs subchannel pick via LB policy. +class LbPicker { + public: + // Starts a pick on chand->lb_policy. + static void StartLocked(grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p", + gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, calld, chand->lb_policy.get()); } - chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); + // If this is a retry, use the send_initial_metadata payload that + // we've cached; otherwise, use the pending batch. The + // send_initial_metadata batch will be the first pending batch in the + // list, as set by get_batch_index() above. + calld->pick.initial_metadata = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata + : calld->pending_batches[0] + .batch->payload->send_initial_metadata.send_initial_metadata; + calld->pick.initial_metadata_flags = + calld->seen_send_initial_metadata + ? calld->send_initial_metadata_flags + : calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, + grpc_combiner_scheduler(chand->combiner)); + calld->pick.on_complete = &calld->pick_closure; + GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); + const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); + if (GPR_LIKELY(pick_done)) { + // Pick completed synchronously. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", + chand, calld); + } + pick_done_locked(elem, GRPC_ERROR_NONE); + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); + } else { + // Pick will be returned asynchronously. + // Add the polling entity from call_data to the channel_data's + // interested_parties, so that the I/O of the LB policy can be done + // under it. It will be removed in pick_done_locked(). + maybe_add_call_to_channel_interested_parties_locked(elem); + // Request notification on call cancellation. + GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); + grpc_call_combiner_set_notify_on_cancel( + calld->call_combiner, + GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, + &LbPicker::CancelLocked, elem, + grpc_combiner_scheduler(chand->combiner))); + } } - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); -} -// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. -// Unrefs the LB policy and invokes async_pick_done_locked(). -static void pick_callback_done_locked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", chand, - calld); + private: + // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. + // Unrefs the LB policy and invokes pick_done_locked(). + static void DoneLocked(void* arg, grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", + chand, calld); + } + pick_done_locked(elem, GRPC_ERROR_REF(error)); + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } - async_pick_done_locked(elem, GRPC_ERROR_REF(error)); - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); -} + + // Note: This runs under the client_channel combiner, but will NOT be + // holding the call combiner. + static void CancelLocked(void* arg, grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + // Note: chand->lb_policy may have changed since we started our pick, + // in which case we will be cancelling the pick on a policy other than + // the one we started it on. However, this will just be a no-op. + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: cancelling pick from LB policy %p", chand, + calld, chand->lb_policy.get()); + } + chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); + } + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); + } +}; + +} // namespace grpc_core // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. @@ -2766,6 +2840,24 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { grpc_deadline_state_reset(elem, calld->deadline); } } + // If the service config set wait_for_ready and the application + // did not explicitly set it, use the value from the service config. + uint32_t* send_initial_metadata_flags = + &calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + if (GPR_UNLIKELY( + calld->method_params->wait_for_ready() != + ClientChannelMethodParams::WAIT_FOR_READY_UNSET && + !(*send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) { + if (calld->method_params->wait_for_ready() == + ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { + *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; + } else { + *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; + } + } } } // If no retry policy, disable retries. @@ -2776,215 +2868,164 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { } } -// Starts a pick on chand->lb_policy. -// Returns true if pick is completed synchronously. -static bool pick_callback_start_locked(grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); +// Invoked once resolver results are available. +static void process_service_config_and_start_lb_pick_locked( + grpc_call_element* elem) { call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, - calld, chand->lb_policy.get()); - } // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); } - // If the application explicitly set wait_for_ready, use that. - // Otherwise, if the service config specified a value for this - // method, use that. - // - // The send_initial_metadata batch will be the first one in the list, - // as set by get_batch_index() above. - calld->pick.initial_metadata = - calld->seen_send_initial_metadata - ? &calld->send_initial_metadata - : calld->pending_batches[0] - .batch->payload->send_initial_metadata.send_initial_metadata; - uint32_t send_initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - const bool wait_for_ready_set_from_api = - send_initial_metadata_flags & - GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; - const bool wait_for_ready_set_from_service_config = - calld->method_params != nullptr && - calld->method_params->wait_for_ready() != - ClientChannelMethodParams::WAIT_FOR_READY_UNSET; - if (GPR_UNLIKELY(!wait_for_ready_set_from_api && - wait_for_ready_set_from_service_config)) { - if (calld->method_params->wait_for_ready() == - ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { - send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; - } else { - send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; - } - } - calld->pick.initial_metadata_flags = send_initial_metadata_flags; - GRPC_CLOSURE_INIT(&calld->pick_closure, pick_callback_done_locked, elem, - grpc_combiner_scheduler(chand->combiner)); - calld->pick.on_complete = &calld->pick_closure; - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); - const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); - if (GPR_LIKELY(pick_done)) { - // Pick completed synchronously. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", - chand, calld); - } - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); - } else { - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, - pick_callback_cancel_locked, elem, - grpc_combiner_scheduler(chand->combiner))); - } - return pick_done; + // Start LB pick. + grpc_core::LbPicker::StartLocked(elem); } -typedef struct { - grpc_call_element* elem; - bool finished; - grpc_closure closure; - grpc_closure cancel_closure; -} pick_after_resolver_result_args; - -// Note: This runs under the client_channel combiner, but will NOT be -// holding the call combiner. -static void pick_after_resolver_result_cancel_locked(void* arg, - grpc_error* error) { - pick_after_resolver_result_args* args = - static_cast<pick_after_resolver_result_args*>(arg); - if (GPR_LIKELY(args->finished)) { - gpr_free(args); - return; - } - // If we don't yet have a resolver result, then a closure for - // pick_after_resolver_result_done_locked() will have been added to - // chand->waiting_for_resolver_result_closures, and it may not be invoked - // until after this call has been destroyed. We mark the operation as - // finished, so that when pick_after_resolver_result_done_locked() - // is called, it will be a no-op. We also immediately invoke - // async_pick_done_locked() to propagate the error back to the caller. - args->finished = true; - grpc_call_element* elem = args->elem; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling pick waiting for resolver result", - chand, calld); - } - // Note: Although we are not in the call combiner here, we are - // basically stealing the call combiner from the pending pick, so - // it's safe to call async_pick_done_locked() here -- we are - // essentially calling it here instead of calling it in - // pick_after_resolver_result_done_locked(). - async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); -} - -static void pick_after_resolver_result_done_locked(void* arg, - grpc_error* error) { - pick_after_resolver_result_args* args = - static_cast<pick_after_resolver_result_args*>(arg); - if (GPR_UNLIKELY(args->finished)) { - /* cancelled, do nothing */ - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "call cancelled before resolver result"); - } - gpr_free(args); - return; - } - args->finished = true; - grpc_call_element* elem = args->elem; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { +namespace grpc_core { + +// Handles waiting for a resolver result. +// Used only for the first call on an idle channel. +class ResolverResultWaiter { + public: + explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", + gpr_log(GPR_INFO, + "chand=%p calld=%p: deferring pick pending resolver result", chand, calld); } - async_pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { - // Shutting down. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, - calld); + // Add closure to be run when a resolver result is available. + GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, + grpc_combiner_scheduler(chand->combiner)); + AddToWaitingList(); + // Set cancellation closure, so that we abort if the call is cancelled. + GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, + this, grpc_combiner_scheduler(chand->combiner)); + grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, + &cancel_closure_); + } + + private: + // Adds closure_ to chand->waiting_for_resolver_result_closures. + void AddToWaitingList() { + channel_data* chand = static_cast<channel_data*>(elem_->channel_data); + grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, + &done_closure_, GRPC_ERROR_NONE); + } + + // Invoked when a resolver result is available. + static void DoneLocked(void* arg, grpc_error* error) { + ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); + // If CancelLocked() has already run, delete ourselves without doing + // anything. Note that the call stack may have already been destroyed, + // so it's not safe to access anything in elem_. + if (GPR_UNLIKELY(self->finished_)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "call cancelled before resolver result"); + } + Delete(self); + return; } - async_pick_done_locked( - elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { - // Transient resolver failure. - // If call has wait_for_ready=true, try again; otherwise, fail. - uint32_t send_initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + // Otherwise, process the resolver result. + grpc_call_element* elem = self->elem_; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=true; trying again", + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", chand, calld); } - pick_after_resolver_result_start_locked(elem); + pick_done_locked(elem, GRPC_ERROR_REF(error)); + } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { + // Shutting down. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, + calld); + } + pick_done_locked(elem, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { + // Transient resolver failure. + // If call has wait_for_ready=true, try again; otherwise, fail. + uint32_t send_initial_metadata_flags = + calld->seen_send_initial_metadata + ? calld->send_initial_metadata_flags + : calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=true; trying again", + chand, calld); + } + // Re-add ourselves to the waiting list. + self->AddToWaitingList(); + // Return early so that we don't set finished_ to true below. + return; + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=false; failing", + chand, calld); + } + pick_done_locked( + elem, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + } } else { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=false; failing", + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", chand, calld); } - async_pick_done_locked( - elem, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + process_service_config_and_start_lb_pick_locked(elem); } - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing pick", - chand, calld); + self->finished_ = true; + } + + // Invoked when the call is cancelled. + // Note: This runs under the client_channel combiner, but will NOT be + // holding the call combiner. + static void CancelLocked(void* arg, grpc_error* error) { + ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); + // If DoneLocked() has already run, delete ourselves without doing anything. + if (GPR_LIKELY(self->finished_)) { + Delete(self); + return; } - if (GPR_LIKELY(pick_callback_start_locked(elem))) { - // Even if the LB policy returns a result synchronously, we have - // already added our polling entity to chand->interested_parties - // in order to wait for the resolver result, so we need to - // remove it here. Therefore, we call async_pick_done_locked() - // instead of pick_done_locked(). - async_pick_done_locked(elem, GRPC_ERROR_NONE); + // If we are being cancelled, immediately invoke pick_done_locked() + // to propagate the error back to the caller. + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { + grpc_call_element* elem = self->elem_; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: cancelling call waiting for name " + "resolution", + chand, calld); + } + // Note: Although we are not in the call combiner here, we are + // basically stealing the call combiner from the pending pick, so + // it's safe to call pick_done_locked() here -- we are essentially + // calling it here instead of calling it in DoneLocked(). + pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); } + self->finished_ = true; } -} -static void pick_after_resolver_result_start_locked(grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: deferring pick pending resolver result", chand, - calld); - } - pick_after_resolver_result_args* args = - static_cast<pick_after_resolver_result_args*>(gpr_zalloc(sizeof(*args))); - args->elem = elem; - GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, - args, grpc_combiner_scheduler(chand->combiner)); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &args->closure, GRPC_ERROR_NONE); - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&args->cancel_closure, - pick_after_resolver_result_cancel_locked, args, - grpc_combiner_scheduler(chand->combiner))); -} + grpc_call_element* elem_; + grpc_closure done_closure_; + grpc_closure cancel_closure_; + bool finished_ = false; +}; + +} // namespace grpc_core static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); @@ -2993,31 +3034,24 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { GPR_ASSERT(calld->pick.connected_subchannel == nullptr); GPR_ASSERT(calld->subchannel_call == nullptr); if (GPR_LIKELY(chand->lb_policy != nullptr)) { - // We already have an LB policy, so ask it for a pick. - if (GPR_LIKELY(pick_callback_start_locked(elem))) { - // Pick completed synchronously. - pick_done_locked(elem, GRPC_ERROR_NONE); - return; - } + // We already have resolver results, so process the service config + // and start an LB pick. + process_service_config_and_start_lb_pick_locked(elem); + } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { + pick_done_locked(elem, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } else { // We do not yet have an LB policy, so wait for a resolver result. - if (GPR_UNLIKELY(chand->resolver == nullptr)) { - pick_done_locked(elem, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - return; - } if (GPR_UNLIKELY(!chand->started_resolving)) { start_resolving_locked(chand); } - pick_after_resolver_result_start_locked(elem); + // Create a new waiter, which will delete itself when done. + grpc_core::New<grpc_core::ResolverResultWaiter>(elem); + // Add the polling entity from call_data to the channel_data's + // interested_parties, so that the I/O of the resolver can be done + // under it. It will be removed in pick_done_locked(). + maybe_add_call_to_channel_interested_parties_locked(elem); } - // We need to wait for either a resolver result or for an async result - // from the LB policy. Add the polling entity from call_data to the - // channel_data's interested_parties, so that the I/O of the LB policy - // and resolver can be done under it. The polling entity will be - // removed in async_pick_done_locked(). - grpc_polling_entity_add_to_pollset_set(calld->pollent, - chand->interested_parties); } // diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 73d2af6e42..00a84302ed 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -18,7 +18,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) #include <ares.h> #include <sys/ioctl.h> @@ -348,4 +348,4 @@ void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { gpr_mu_unlock(&ev_driver->mu); } -#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) */ +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index cc4a823798..a8090d18a6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1451,10 +1451,8 @@ static void perform_stream_op_locked(void* stream_op, } } if (op_payload->send_initial_metadata.peer_string != nullptr) { - char* old_peer_string = (char*)gpr_atm_full_xchg( - op_payload->send_initial_metadata.peer_string, - (gpr_atm)gpr_strdup(t->peer_string)); - gpr_free(old_peer_string); + gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string, + (gpr_atm)t->peer_string); } } @@ -1569,10 +1567,8 @@ static void perform_stream_op_locked(void* stream_op, s->trailing_metadata_available = op_payload->recv_initial_metadata.trailing_metadata_available; if (op_payload->recv_initial_metadata.peer_string != nullptr) { - char* old_peer_string = (char*)gpr_atm_full_xchg( - op_payload->recv_initial_metadata.peer_string, - (gpr_atm)gpr_strdup(t->peer_string)); - gpr_free(old_peer_string); + gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string, + (gpr_atm)t->peer_string); } grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index d5ef063883..0eaf63f133 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -41,14 +41,18 @@ #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/timeout_encoding.h" -#define HASH_FRAGMENT_1(x) ((x)&255) -#define HASH_FRAGMENT_2(x) ((x >> 8) & 255) -#define HASH_FRAGMENT_3(x) ((x >> 16) & 255) -#define HASH_FRAGMENT_4(x) ((x >> 24) & 255) +#define HASH_FRAGMENT_MASK (GRPC_CHTTP2_HPACKC_NUM_VALUES - 1) +#define HASH_FRAGMENT_1(x) ((x)&HASH_FRAGMENT_MASK) +#define HASH_FRAGMENT_2(x) \ + (((x) >> GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS) & HASH_FRAGMENT_MASK) +#define HASH_FRAGMENT_3(x) \ + (((x) >> (GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS * 2)) & HASH_FRAGMENT_MASK) +#define HASH_FRAGMENT_4(x) \ + (((x) >> (GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS * 3)) & HASH_FRAGMENT_MASK) /* if the probability of this item being seen again is < 1/x then don't add it to the table */ -#define ONE_ON_ADD_PROBABILITY 128 +#define ONE_ON_ADD_PROBABILITY (GRPC_CHTTP2_HPACKC_NUM_VALUES >> 1) /* don't consider adding anything bigger than this to the hpack table */ #define MAX_DECODER_SPACE_USAGE 512 @@ -135,7 +139,7 @@ static void inc_filter(uint8_t idx, uint32_t* sum, uint8_t* elems) { } else { int i; *sum = 0; - for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_FILTERS; i++) { + for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_VALUES; i++) { elems[i] /= 2; (*sum) += elems[i]; } diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h index b370932131..e31a7399d7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h @@ -28,8 +28,9 @@ #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" -#define GRPC_CHTTP2_HPACKC_NUM_FILTERS 256 -#define GRPC_CHTTP2_HPACKC_NUM_VALUES 256 +// This should be <= 8. We use 6 to save space. +#define GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS 6 +#define GRPC_CHTTP2_HPACKC_NUM_VALUES (1 << GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS) /* initial table size, per spec */ #define GRPC_CHTTP2_HPACKC_INITIAL_TABLE_SIZE 4096 /* maximum table size we'll actually use */ @@ -58,7 +59,7 @@ typedef struct { a new literal should be added to the compression table or not. They track a single integer that counts how often a particular value has been seen. When that count reaches max (255), all values are halved. */ - uint8_t filter_elems[GRPC_CHTTP2_HPACKC_NUM_FILTERS]; + uint8_t filter_elems[GRPC_CHTTP2_HPACKC_NUM_VALUES]; /* entry tables for keys & elems: these tables track values that have been seen and *may* be in the decompressor table */ diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc new file mode 100644 index 0000000000..30f4e65632 --- /dev/null +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -0,0 +1,183 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <CoreFoundation/CoreFoundation.h> +#import "src/core/lib/iomgr/cfstream_handle.h" + +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +void* CFStreamHandle::Retain(void* info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(info); + CFSTREAM_HANDLE_REF(handle, "retain"); + return info; +} + +void CFStreamHandle::Release(void* info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(info); + CFSTREAM_HANDLE_UNREF(handle, "release"); +} + +CFStreamHandle* CFStreamHandle::CreateStreamHandle( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream) { + return new CFStreamHandle(read_stream, write_stream); +} + +void CFStreamHandle::ReadCallback(CFReadStreamRef stream, + CFStreamEventType type, + void* client_callback_info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info); + CFSTREAM_HANDLE_REF(handle, "read callback"); + dispatch_async( + dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle, + stream, type, client_callback_info); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventHasBytesAvailable: + case kCFStreamEventEndEncountered: + handle->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->read_event_.SetReady(); + break; + default: + GPR_UNREACHABLE_CODE(return ); + } + CFSTREAM_HANDLE_UNREF(handle, "read callback"); + }); +} +void CFStreamHandle::WriteCallback(CFWriteStreamRef stream, + CFStreamEventType type, + void* clientCallBackInfo) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo); + CFSTREAM_HANDLE_REF(handle, "write callback"); + dispatch_async( + dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle, + stream, type, clientCallBackInfo); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + case kCFStreamEventEndEncountered: + handle->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->write_event_.SetReady(); + break; + default: + GPR_UNREACHABLE_CODE(return ); + } + CFSTREAM_HANDLE_UNREF(handle, "write callback"); + }); +} + +CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream) { + gpr_ref_init(&refcount_, 1); + open_event_.InitEvent(); + read_event_.InitEvent(); + write_event_.InitEvent(); + CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil}; + CFReadStreamSetClient( + read_stream, + kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamHandle::ReadCallback, &ctx); + CFWriteStreamSetClient( + write_stream, + kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamHandle::WriteCallback, &ctx); + CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); + CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); +} + +CFStreamHandle::~CFStreamHandle() { + open_event_.DestroyEvent(); + read_event_.DestroyEvent(); + write_event_.DestroyEvent(); +} + +void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) { + open_event_.NotifyOn(closure); +} + +void CFStreamHandle::NotifyOnRead(grpc_closure* closure) { + read_event_.NotifyOn(closure); +} + +void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) { + write_event_.NotifyOn(closure); +} + +void CFStreamHandle::Shutdown(grpc_error* error) { + open_event_.SetShutdown(GRPC_ERROR_REF(error)); + read_event_.SetShutdown(GRPC_ERROR_REF(error)); + write_event_.SetShutdown(GRPC_ERROR_REF(error)); + GRPC_ERROR_UNREF(error); +} + +void CFStreamHandle::Ref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, + reason, val, val + 1); + } + gpr_ref(&refcount_); +} + +void CFStreamHandle::Unref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(GPR_ERROR, + "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, + reason, val, val - 1); + } + if (gpr_unref(&refcount_)) { + delete this; + } +} + +#endif diff --git a/src/core/lib/iomgr/cfstream_handle.h b/src/core/lib/iomgr/cfstream_handle.h new file mode 100644 index 0000000000..4258e72431 --- /dev/null +++ b/src/core/lib/iomgr/cfstream_handle.h @@ -0,0 +1,80 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* The CFStream handle acts as an event synchronization entity for + * read/write/open/error/eos events happening on CFStream streams. */ + +#ifndef GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H +#define GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <CoreFoundation/CoreFoundation.h> + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/lockfree_event.h" + +class CFStreamHandle final { + public: + static CFStreamHandle* CreateStreamHandle(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream); + ~CFStreamHandle(); + CFStreamHandle(const CFReadStreamRef& ref) = delete; + CFStreamHandle(CFReadStreamRef&& ref) = delete; + CFStreamHandle& operator=(const CFStreamHandle& rhs) = delete; + + void NotifyOnOpen(grpc_closure* closure); + void NotifyOnRead(grpc_closure* closure); + void NotifyOnWrite(grpc_closure* closure); + void Shutdown(grpc_error* error); + + void Ref(const char* file = "", int line = 0, const char* reason = nullptr); + void Unref(const char* file = "", int line = 0, const char* reason = nullptr); + + private: + CFStreamHandle(CFReadStreamRef read_stream, CFWriteStreamRef write_stream); + static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void* Retain(void* info); + static void Release(void* info); + + grpc_core::LockfreeEvent open_event_; + grpc_core::LockfreeEvent read_event_; + grpc_core::LockfreeEvent write_event_; + + gpr_refcount refcount_; +}; + +#ifdef DEBUG +#define CFSTREAM_HANDLE_REF(handle, reason) \ + (handle)->Ref(__FILE__, __LINE__, (reason)) +#define CFSTREAM_HANDLE_UNREF(handle, reason) \ + (handle)->Unref(__FILE__, __LINE__, (reason)) +#else +#define CFSTREAM_HANDLE_REF(handle, reason) (handle)->Ref() +#define CFSTREAM_HANDLE_UNREF(handle, reason) (handle)->Unref() +#endif + +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H */ diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc new file mode 100644 index 0000000000..c3bc0cc8fd --- /dev/null +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -0,0 +1,372 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_ENDPOINT + +#import <CoreFoundation/CoreFoundation.h> +#import "src/core/lib/iomgr/endpoint_cfstream.h" + +#include <grpc/slice_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/cfstream_handle.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error_cfstream.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct { + grpc_endpoint base; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamHandle* stream_sync; + + grpc_closure* read_cb; + grpc_closure* write_cb; + grpc_slice_buffer* read_slices; + grpc_slice_buffer* write_slices; + + grpc_closure read_action; + grpc_closure write_action; + + char* peer_string; + grpc_resource_user* resource_user; + grpc_resource_user_slice_allocator slice_allocator; +} CFStreamEndpoint; + +static void CFStreamFree(CFStreamEndpoint* ep) { + grpc_resource_user_unref(ep->resource_user); + CFRelease(ep->read_stream); + CFRelease(ep->write_stream); + CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free"); + gpr_free(ep->peer_string); + gpr_free(ep); +} + +#ifndef NDEBUG +#define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__) +#define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__) +static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason, + const char* file, int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, + reason, val, val - 1); + } + if (gpr_unref(&ep->refcount)) { + CFStreamFree(ep); + } +} +static void CFStreamRef(CFStreamEndpoint* ep, const char* reason, + const char* file, int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, + reason, val, val + 1); + } + gpr_ref(&ep->refcount); +} +#else +#define EP_REF(ep, reason) CFStreamRef((ep)) +#define EP_UNREF(ep, reason) CFStreamUnref((ep)) +static void CFStreamUnref(CFStreamEndpoint* ep) { + if (gpr_unref(&ep->refcount)) { + CFStreamFree(ep); + } +} +static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); } +#endif + +static grpc_error* CFStreamAnnotateError(grpc_error* src_error, + CFStreamEndpoint* ep) { + return grpc_error_set_str( + grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE), + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(ep->peer_string)); +} + +static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep, + ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg); + size_t i; + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < ep->read_slices->count; i++) { + char* dump = grpc_dump_slice(ep->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump); + gpr_free(dump); + } + } + grpc_closure* cb = ep->read_cb; + ep->read_cb = nullptr; + ep->read_slices = nullptr; + GRPC_CLOSURE_SCHED(cb, error); +} + +static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep, + ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg); + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write: error=%s", str); + } + grpc_closure* cb = ep->write_cb; + ep->write_cb = nullptr; + ep->write_slices = nullptr; + GRPC_CLOSURE_SCHED(cb, error); +} + +static void ReadAction(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + GPR_ASSERT(ep->read_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, GRPC_ERROR_REF(error)); + EP_UNREF(ep, "read"); + return; + } + + GPR_ASSERT(ep->read_slices->count == 1); + grpc_slice slice = ep->read_slices->slices[0]; + size_t len = GRPC_SLICE_LENGTH(slice); + CFIndex read_size = + CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len); + if (read_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream); + if (stream_error != nullptr) { + error = CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep); + CFRelease(stream_error); + } else { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error"); + } + CallReadCb(ep, error); + EP_UNREF(ep, "read"); + } else if (read_size == 0) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, + CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep)); + EP_UNREF(ep, "read"); + } else { + if (read_size < len) { + grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr); + } + CallReadCb(ep, GRPC_ERROR_NONE); + EP_UNREF(ep, "read"); + } +} + +static void WriteAction(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + GPR_ASSERT(ep->write_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); + CallWriteCb(ep, GRPC_ERROR_REF(error)); + EP_UNREF(ep, "write"); + return; + } + + grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices); + size_t slice_len = GRPC_SLICE_LENGTH(slice); + CFIndex write_size = CFWriteStreamWrite( + ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); + if (write_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); + CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream); + if (stream_error != nullptr) { + error = CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep); + CFRelease(stream_error); + } else { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed."); + } + CallWriteCb(ep, error); + EP_UNREF(ep, "write"); + } else { + if (write_size < GRPC_SLICE_LENGTH(slice)) { + grpc_slice_buffer_undo_take_first( + ep->write_slices, grpc_slice_sub(slice, write_size, slice_len)); + } + if (ep->write_slices->length > 0) { + ep->stream_sync->NotifyOnWrite(&ep->write_action); + } else { + CallWriteCb(ep, GRPC_ERROR_NONE); + EP_UNREF(ep, "write"); + } + + if (grpc_tcp_trace.enabled()) { + grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); + char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump); + gpr_free(dump); + grpc_slice_unref_internal(trace_slice); + } + } + grpc_slice_unref_internal(slice); +} + +static void CFStreamReadAllocationDone(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + if (error == GRPC_ERROR_NONE) { + ep->stream_sync->NotifyOnRead(&ep->read_action); + } else { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, error); + EP_UNREF(ep, "read"); + } +} + +static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl, + slices, cb, slices->length); + } + GPR_ASSERT(ep_impl->read_cb == nullptr); + ep_impl->read_cb = cb; + ep_impl->read_slices = slices; + grpc_slice_buffer_reset_and_unref_internal(slices); + grpc_resource_user_alloc_slices(&ep_impl->slice_allocator, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + ep_impl->read_slices); + EP_REF(ep_impl, "read"); +} + +static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", + ep_impl, slices, cb, slices->length); + } + GPR_ASSERT(ep_impl->write_cb == nullptr); + ep_impl->write_cb = cb; + ep_impl->write_slices = slices; + EP_REF(ep_impl, "write"); + ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action); +} + +void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why); + } + CFReadStreamClose(ep_impl->read_stream); + CFWriteStreamClose(ep_impl->write_stream); + ep_impl->stream_sync->Shutdown(why); + grpc_resource_user_shutdown(ep_impl->resource_user); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why); + } +} + +void CFStreamDestroy(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl); + } + EP_UNREF(ep_impl, "destroy"); +} + +grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + return ep_impl->resource_user; +} + +char* CFStreamGetPeer(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + return gpr_strdup(ep_impl->peer_string); +} + +int CFStreamGetFD(grpc_endpoint* ep) { return 0; } + +void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} +void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} +void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep, + grpc_pollset_set* pollset) {} + +static const grpc_endpoint_vtable vtable = {CFStreamRead, + CFStreamWrite, + CFStreamAddToPollset, + CFStreamAddToPollsetSet, + CFStreamDeleteFromPollsetSet, + CFStreamShutdown, + CFStreamDestroy, + CFStreamGetResourceUser, + CFStreamGetPeer, + CFStreamGetFD}; + +grpc_endpoint* grpc_cfstream_endpoint_create( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream, + const char* peer_string, grpc_resource_quota* resource_quota, + CFStreamHandle* stream_sync) { + CFStreamEndpoint* ep_impl = + static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint))); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, + "CFStream endpoint:%p create readStream:%p writeStream: %p", + ep_impl, read_stream, write_stream); + } + ep_impl->base.vtable = &vtable; + gpr_ref_init(&ep_impl->refcount, 1); + ep_impl->read_stream = read_stream; + ep_impl->write_stream = write_stream; + CFRetain(read_stream); + CFRetain(write_stream); + ep_impl->stream_sync = stream_sync; + CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create"); + + ep_impl->peer_string = gpr_strdup(peer_string); + ep_impl->read_cb = nil; + ep_impl->write_cb = nil; + ep_impl->read_slices = nil; + ep_impl->write_slices = nil; + GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction, + static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction, + static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx); + ep_impl->resource_user = + grpc_resource_user_create(resource_quota, peer_string); + grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator, + ep_impl->resource_user, + CFStreamReadAllocationDone, ep_impl); + + return &ep_impl->base; +} + +#endif /* GRPC_CFSTREAM_ENDPOINT */ diff --git a/src/core/lib/iomgr/endpoint_cfstream.h b/src/core/lib/iomgr/endpoint_cfstream.h new file mode 100644 index 0000000000..ef957c1f11 --- /dev/null +++ b/src/core/lib/iomgr/endpoint_cfstream.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H +#define GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H +/* + Low level TCP "bottom half" implementation, for use by transports built on + top of a TCP connection. + + Note that this file does not (yet) include APIs for creating the socket in + the first place. + + All calls passing slice transfer ownership of a slice refcount unless + otherwise specified. +*/ + +#include <grpc/support/port_platform.h> + +#ifdef GRPC_CFSTREAM + +#import <CoreFoundation/CoreFoundation.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/cfstream_handle.h" +#include "src/core/lib/iomgr/endpoint.h" + +grpc_endpoint* grpc_cfstream_endpoint_create( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream, + const char* peer_string, grpc_resource_quota* resource_quota, + CFStreamHandle* stream_sync); + +#endif /* GRPC_CFSTREAM */ + +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H */ diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 4088cf612e..90ed34da11 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -312,6 +312,12 @@ static void internal_add_error(grpc_error** err, grpc_error* new_err) { // It is very common to include and extra int and string in an error #define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME) +static bool g_error_creation_allowed = true; + +void grpc_disable_error_creation() { g_error_creation_allowed = false; } + +void grpc_enable_error_creation() { g_error_creation_allowed = true; } + grpc_error* grpc_error_create(const char* file, int line, grpc_slice desc, grpc_error** referencing, size_t num_referencing) { @@ -326,6 +332,12 @@ grpc_error* grpc_error_create(const char* file, int line, grpc_slice desc, return GRPC_ERROR_OOM; } #ifndef NDEBUG + if (!g_error_creation_allowed) { + gpr_log(GPR_ERROR, + "Error creation occurred when error creation was disabled [%s:%d]", + file, line); + abort(); + } if (grpc_trace_error_refcount.enabled()) { gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line); } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index f8cae4da82..27c4d22fd1 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -123,6 +123,11 @@ typedef enum { #define GRPC_ERROR_OOM ((grpc_error*)2) #define GRPC_ERROR_CANCELLED ((grpc_error*)4) +// debug only toggles that allow for a sanity to check that ensures we will +// never create any errors in the per-RPC hotpath. +void grpc_disable_error_creation(); +void grpc_enable_error_creation(); + const char* grpc_error_string(grpc_error* error); /// Create an error - but use GRPC_ERROR_CREATE instead diff --git a/src/core/lib/iomgr/error_cfstream.cc b/src/core/lib/iomgr/error_cfstream.cc new file mode 100644 index 0000000000..d7af8c377f --- /dev/null +++ b/src/core/lib/iomgr/error_cfstream.cc @@ -0,0 +1,52 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GRPC_CFSTREAM +#include <CoreFoundation/CoreFoundation.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/iomgr/error.h" + +#define MAX_ERROR_DESCRIPTION 256 + +grpc_error* grpc_error_create_from_cferror(const char* file, int line, + void* arg, const char* custom_desc) { + CFErrorRef error = static_cast<CFErrorRef>(arg); + char buf_domain[MAX_ERROR_DESCRIPTION]; + char buf_desc[MAX_ERROR_DESCRIPTION]; + char* error_msg; + CFErrorDomain domain = CFErrorGetDomain((error)); + CFIndex code = CFErrorGetCode((error)); + CFStringRef desc = CFErrorCopyDescription((error)); + CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION, + kCFStringEncodingUTF8); + CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION, + kCFStringEncodingUTF8); + gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)", + custom_desc, buf_domain, code, buf_desc); + CFRelease(desc); + grpc_error* return_error = grpc_error_create( + file, line, grpc_slice_from_copied_string(error_msg), NULL, 0); + gpr_free(error_msg); + return return_error; +} +#endif /* GRPC_CFSTREAM */ diff --git a/src/core/lib/iomgr/error_cfstream.h b/src/core/lib/iomgr/error_cfstream.h new file mode 100644 index 0000000000..06ab751329 --- /dev/null +++ b/src/core/lib/iomgr/error_cfstream.h @@ -0,0 +1,31 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H +#define GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H + +#ifdef GRPC_CFSTREAM +// Create an error from Apple Core Foundation CFError object +#define GRPC_ERROR_CREATE_FROM_CFERROR(error, desc) \ + grpc_error_create_from_cferror(__FILE__, __LINE__, \ + static_cast<void*>((error)), (desc)) +grpc_error* grpc_error_create_from_cferror(const char* file, int line, + void* arg, const char* desc); +#endif /* GRPC_CFSTREAM */ + +#endif /* GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 554522598c..98ab974057 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1264,12 +1264,12 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { } #else /* defined(GRPC_LINUX_EPOLL) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1) #include "src/core/lib/iomgr/ev_epoll1_linux.h" /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { return nullptr; } -#endif /* defined(GRPC_POSIX_SOCKET) */ +#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */ #endif /* !defined(GRPC_LINUX_EPOLL) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index be400c6ee9..f22eb8ee87 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1586,7 +1586,7 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( } #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) #include "src/core/lib/iomgr/ev_epollex_linux.h" /* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means epoll_create1 is not available. Return NULL */ @@ -1594,6 +1594,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { return nullptr; } -#endif /* defined(GRPC_POSIX_SOCKET) */ +#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) */ #endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 6c0f0c91f3..20512e31ac 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1750,7 +1750,7 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( } #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLLSIG) #include "src/core/lib/iomgr/ev_epollsig_linux.h" /* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means epoll_create1 is not available. Return NULL */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 06378bba25..df6b0e1e89 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_EV_POLL #include "src/core/lib/iomgr/ev_poll_posix.h" @@ -1769,4 +1769,4 @@ const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request) { return &vtable; } -#endif +#endif /* GRPC_POSIX_SOCKET_EV_POLL */ diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index efa91045d4..82b21df7ba 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_EV #include "src/core/lib/iomgr/ev_posix.h" @@ -343,4 +343,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { g_event_engine->pollset_set_del_fd(pollset_set, fd); } -#endif // GRPC_POSIX_SOCKET +#endif // GRPC_POSIX_SOCKET_EV diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 66c9cb7ff7..ca7334c9a4 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_IOMGR #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" @@ -64,4 +64,4 @@ void grpc_set_default_iomgr_platform() { grpc_set_iomgr_platform_vtable(&vtable); } -#endif /* GRPC_POSIX_SOCKET */ +#endif /* GRPC_POSIX_SOCKET_IOMGR */ diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc index 9f164f65b0..dea07cae53 100644 --- a/src/core/lib/iomgr/polling_entity.cc +++ b/src/core/lib/iomgr/polling_entity.cc @@ -61,8 +61,11 @@ bool grpc_polling_entity_is_empty(const grpc_polling_entity* pollent) { void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, grpc_pollset_set* pss_dst) { if (pollent->tag == GRPC_POLLS_POLLSET) { - GPR_ASSERT(pollent->pollent.pollset != nullptr); - grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset); + // CFStream does not use file destriptors. When CFStream is used, the fd + // pollset is possible to be null. + if (pollent->pollent.pollset != nullptr) { + grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset); + } } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) { GPR_ASSERT(pollent->pollent.pollset_set != nullptr); grpc_pollset_set_add_pollset_set(pss_dst, pollent->pollent.pollset_set); @@ -75,8 +78,14 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent, grpc_pollset_set* pss_dst) { if (pollent->tag == GRPC_POLLS_POLLSET) { +#ifdef GRPC_CFSTREAM + if (pollent->pollent.pollset != nullptr) { + grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset); + } +#else GPR_ASSERT(pollent->pollent.pollset != nullptr); grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset); +#endif } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) { GPR_ASSERT(pollent->pollent.pollset_set != nullptr); grpc_pollset_set_del_pollset_set(pss_dst, pollent->pollent.pollset_set); diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index a397012003..80d8e63cdd 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -97,7 +97,26 @@ #define GRPC_MSG_IOVLEN_TYPE int #define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#ifdef GRPC_CFSTREAM +#define GRPC_POSIX_SOCKET_IOMGR 1 +#define GRPC_CFSTREAM_ENDPOINT 1 +#define GRPC_CFSTREAM_CLIENT 1 +#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1 +#define GRPC_POSIX_SOCKET_EV 1 +#define GRPC_POSIX_SOCKET_EV_EPOLL1 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1 +#define GRPC_POSIX_SOCKET_EV_POLL 1 +#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1 +#define GRPC_POSIX_SOCKET_SOCKADDR 1 +#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1 +#define GRPC_POSIX_SOCKET_TCP 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1 +#define GRPC_POSIX_SOCKET_UTILS_COMMON 1 +#else #define GRPC_POSIX_SOCKET 1 +#endif #define GRPC_POSIX_SOCKETUTILS 1 #define GRPC_POSIX_SYSCONF 1 #define GRPC_POSIX_WAKEUP_FD 1 @@ -131,12 +150,30 @@ #endif #if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \ - defined(GRPC_CUSTOM_SOCKET) != \ + defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_CFSTREAM) != \ 1 #error \ "Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET" #endif +#ifdef GRPC_POSIX_SOCKET +#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1 +#define GRPC_POSIX_SOCKET_EV 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1 +#define GRPC_POSIX_SOCKET_EV_POLL 1 +#define GRPC_POSIX_SOCKET_EV_EPOLL1 1 +#define GRPC_POSIX_SOCKET_IOMGR 1 +#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1 +#define GRPC_POSIX_SOCKET_SOCKADDR 1 +#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1 +#define GRPC_POSIX_SOCKET_TCP 1 +#define GRPC_POSIX_SOCKET_TCP_CLIENT 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1 +#define GRPC_POSIX_SOCKET_UTILS_COMMON 1 +#endif + #if defined(GRPC_POSIX_HOST_NAME_MAX) && defined(GRPC_POSIX_SYSCONF) #error "Cannot define both GRPC_POSIX_HOST_NAME_MAX and GRPC_POSIX_SYSCONF" #endif diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index fe0d834582..6afe94a7a9 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -33,7 +33,7 @@ #include <ws2tcpip.h> #endif -#ifdef GRPC_POSIX_SOCKET +#if defined(GRPC_POSIX_SOCKET) || defined(GRPC_CFSTREAM) #include <sys/socket.h> #endif diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index a82075542f..7a825643e1 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -19,7 +19,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_RESOLVE_ADDRESS #include "src/core/lib/iomgr/sockaddr.h" diff --git a/src/core/lib/iomgr/sockaddr_posix.h b/src/core/lib/iomgr/sockaddr_posix.h index 5b18bbc465..3cedd9082d 100644 --- a/src/core/lib/iomgr/sockaddr_posix.h +++ b/src/core/lib/iomgr/sockaddr_posix.h @@ -23,7 +23,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_SOCKADDR #include <arpa/inet.h> #include <netdb.h> #include <netinet/in.h> diff --git a/src/core/lib/iomgr/socket_factory_posix.cc b/src/core/lib/iomgr/socket_factory_posix.cc index 1d1e36c0e3..57137769c8 100644 --- a/src/core/lib/iomgr/socket_factory_posix.cc +++ b/src/core/lib/iomgr/socket_factory_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_SOCKET_FACTORY #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/useful.h" diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index 04a1767731..caee652307 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON #include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc new file mode 100644 index 0000000000..ffed3bbef6 --- /dev/null +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -0,0 +1,216 @@ + +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_CLIENT + +#include <CoreFoundation/CoreFoundation.h> + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> + +#include <netinet/in.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/iomgr/cfstream_handle.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint_cfstream.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/error_cfstream.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/timer.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct CFStreamConnect { + gpr_mu mu; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamHandle* stream_sync; + + grpc_timer alarm; + grpc_closure on_alarm; + grpc_closure on_open; + + bool read_stream_open; + bool write_stream_open; + bool failed; + + grpc_closure* closure; + grpc_endpoint** endpoint; + int refs; + char* addr_name; + grpc_resource_quota* resource_quota; +} CFStreamConnect; + +static void CFStreamConnectCleanup(CFStreamConnect* connect) { + grpc_resource_quota_unref_internal(connect->resource_quota); + CFSTREAM_HANDLE_UNREF(connect->stream_sync, "async connect clean up"); + CFRelease(connect->read_stream); + CFRelease(connect->write_stream); + gpr_mu_destroy(&connect->mu); + gpr_free(connect->addr_name); + gpr_free(connect); +} + +static void OnAlarm(void* arg, grpc_error* error) { + CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_closure* closure = connect->closure; + connect->closure = nil; + const bool done = (--connect->refs == 0); + gpr_mu_unlock(&connect->mu); + // Only schedule a callback once, by either OnAlarm or OnOpen. The + // first one issues callback while the second one does cleanup. + if (done) { + CFStreamConnectCleanup(connect); + } else { + grpc_error* error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); + GRPC_CLOSURE_SCHED(closure, error); + } +} + +static void OnOpen(void* arg, grpc_error* error) { + CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_timer_cancel(&connect->alarm); + grpc_closure* closure = connect->closure; + connect->closure = nil; + + bool done = (--connect->refs == 0); + grpc_endpoint** endpoint = connect->endpoint; + + // Only schedule a callback once, by either OnAlarm or OnOpen. The + // first one issues callback while the second one does cleanup. + if (done) { + gpr_mu_unlock(&connect->mu); + CFStreamConnectCleanup(connect); + } else { + if (error == GRPC_ERROR_NONE) { + CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream); + if (stream_error == NULL) { + stream_error = CFWriteStreamCopyError(connect->write_stream); + } + if (stream_error) { + error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error"); + CFRelease(stream_error); + } + if (error == GRPC_ERROR_NONE) { + *endpoint = grpc_cfstream_endpoint_create( + connect->read_stream, connect->write_stream, connect->addr_name, + connect->resource_quota, connect->stream_sync); + } + } else { + GRPC_ERROR_REF(error); + } + gpr_mu_unlock(&connect->mu); + GRPC_CLOSURE_SCHED(closure, error); + } +} + +static void ParseResolvedAddress(const grpc_resolved_address* addr, + CFStringRef* host, int* port) { + char *host_port, *host_string, *port_string; + grpc_sockaddr_to_string(&host_port, addr, 1); + gpr_split_host_port(host_port, &host_string, &port_string); + *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8); + gpr_free(host_string); + gpr_free(port_string); + gpr_free(host_port); + *port = grpc_sockaddr_get_port(addr); +} + +static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* resolved_addr, + grpc_millis deadline) { + CFStreamConnect* connect; + + connect = (CFStreamConnect*)gpr_zalloc(sizeof(CFStreamConnect)); + connect->closure = closure; + connect->endpoint = ep; + connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); + // connect->resource_quota = resource_quota; + connect->refs = 2; // One for the connect operation, one for the timer. + gpr_ref_init(&connect->refcount, 1); + gpr_mu_init(&connect->mu); + + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", + connect->addr_name); + } + + grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + (grpc_resource_quota*)channel_args->args[i].value.pointer.p); + } + } + } + connect->resource_quota = resource_quota; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + + CFStringRef host; + int port; + ParseResolvedAddress(resolved_addr, &host, &port); + CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, + &write_stream); + CFRelease(host); + connect->read_stream = read_stream; + connect->write_stream = write_stream; + connect->stream_sync = + CFStreamHandle::CreateStreamHandle(read_stream, write_stream); + GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect), + grpc_schedule_on_exec_ctx); + connect->stream_sync->NotifyOnOpen(&connect->on_open); + GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, + grpc_schedule_on_exec_ctx); + gpr_mu_lock(&connect->mu); + CFReadStreamOpen(read_stream); + CFWriteStreamOpen(write_stream); + grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); + gpr_mu_unlock(&connect->mu); +} + +grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {CFStreamClientConnect}; + +#endif /* GRPC_CFSTREAM_CLIENT */ diff --git a/src/core/lib/iomgr/tcp_client_custom.cc b/src/core/lib/iomgr/tcp_client_custom.cc index 932c79ea0b..9389861d07 100644 --- a/src/core/lib/iomgr/tcp_client_custom.cc +++ b/src/core/lib/iomgr/tcp_client_custom.cc @@ -140,12 +140,12 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep, socket, connect->addr_name); } - grpc_custom_socket_vtable->connect( - socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len, - custom_connect_callback); GRPC_CLOSURE_INIT(&connect->on_alarm, on_alarm, socket, grpc_schedule_on_exec_ctx); grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); + grpc_custom_socket_vtable->connect( + socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len, + custom_connect_callback); } grpc_tcp_client_vtable custom_tcp_client_vtable = {tcp_connect}; diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 1b43258a91..71e08f1230 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_CLIENT #include "src/core/lib/iomgr/tcp_client_posix.h" diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index b79ffe20f1..43d545846d 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/tcp_posix.h" @@ -70,7 +70,9 @@ struct grpc_tcp { grpc_endpoint base; grpc_fd* em_fd; int fd; - bool finished_edge; + /* Used by the endpoint read function to distinguish the very first read call + * from the rest */ + bool is_first_read; double target_length; double bytes_read_this_round; gpr_refcount refcount; @@ -377,7 +379,6 @@ static void tcp_do_read(grpc_tcp* tcp) { ssize_t read_bytes; size_t i; - GPR_ASSERT(!tcp->finished_edge); GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); for (i = 0; i < tcp->incoming_buffer->count; i++) { @@ -473,7 +474,6 @@ static void tcp_continue_read(grpc_tcp* tcp) { static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) { grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); - GPR_ASSERT(!tcp->finished_edge); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); } @@ -497,10 +497,17 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, grpc_slice_buffer_reset_and_unref_internal(incoming_buffer); grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); - if (tcp->finished_edge) { - tcp->finished_edge = false; + if (tcp->is_first_read) { + /* Endpoint read called for the very first time. Register read callback with + * the polling engine */ + tcp->is_first_read = false; notify_on_read(tcp); } else { + /* Not the first time. We may or may not have more bytes available. In any + * case call tcp->read_done_closure (i.e tcp_handle_read()) which does the + * right thing (i.e calls tcp_do_read() which either reads the available + * bytes or calls notify_on_read() to be notified when new bytes become + * available */ GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE); } } @@ -778,7 +785,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->min_read_chunk_size = tcp_min_read_chunk_size; tcp->max_read_chunk_size = tcp_max_read_chunk_size; tcp->bytes_read_this_round = 0; - tcp->finished_edge = true; + /* Will be set to false by the very first endpoint read function */ + tcp->is_first_read = true; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -811,4 +819,4 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, TCP_UNREF(tcp, "destroy"); } -#endif +#endif /* GRPC_POSIX_SOCKET_TCP */ diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index b39a0754ba..ce18ff99e6 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -25,7 +25,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_SERVER #include "src/core/lib/iomgr/tcp_server.h" @@ -559,4 +559,4 @@ grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = { tcp_server_shutdown_starting_add, tcp_server_unref, tcp_server_shutdown_listeners}; -#endif +#endif /* GRPC_POSIX_SOCKET_TCP_SERVER */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index f21f4ad015..b9f8145572 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON #include "src/core/lib/iomgr/tcp_server_utils_posix.h" @@ -217,4 +217,4 @@ error: return ret; } -#endif /* GRPC_POSIX_SOCKET */ +#endif /* GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON */ diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index 2129029737..43dd68e874 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -219,9 +219,11 @@ static void on_oauth2_token_fetcher_http_response(void* user_data, gpr_mu_lock(&c->mu); c->token_fetch_pending = false; c->access_token_md = GRPC_MDELEM_REF(access_token_md); - c->token_expiration = status == GRPC_CREDENTIALS_OK - ? grpc_core::ExecCtx::Get()->Now() + token_lifetime - : 0; + c->token_expiration = + status == GRPC_CREDENTIALS_OK + ? gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(token_lifetime, GPR_TIMESPAN)) + : gpr_inf_past(GPR_CLOCK_MONOTONIC); grpc_oauth2_pending_get_request_metadata* pending_request = c->pending_requests; c->pending_requests = nullptr; @@ -259,8 +261,10 @@ static bool oauth2_token_fetcher_get_request_metadata( grpc_mdelem cached_access_token_md = GRPC_MDNULL; gpr_mu_lock(&c->mu); if (!GRPC_MDISNULL(c->access_token_md) && - (c->token_expiration - grpc_core::ExecCtx::Get()->Now() > - refresh_threshold)) { + gpr_time_cmp( + gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_MONOTONIC)), + gpr_time_from_seconds(GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, + GPR_TIMESPAN)) > 0) { cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md); } if (!GRPC_MDISNULL(cached_access_token_md)) { @@ -333,7 +337,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials* c, c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2; gpr_ref_init(&c->base.refcount, 1); gpr_mu_init(&c->mu); - c->token_expiration = 0; + c->token_expiration = gpr_inf_past(GPR_CLOCK_MONOTONIC); c->fetch_func = fetch_func; c->pollent = grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create()); diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h index c0dd1546e3..12a1d4484f 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h @@ -71,7 +71,7 @@ typedef struct { grpc_call_credentials base; gpr_mu mu; grpc_mdelem access_token_md; - grpc_millis token_expiration; + gpr_timespec token_expiration; bool token_fetch_pending; grpc_oauth2_pending_get_request_metadata* pending_requests; grpc_httpcli_context httpcli_context; diff --git a/src/core/lib/security/util/json_util.cc b/src/core/lib/security/util/json_util.cc index 75512a19c9..fe9f5fe3d3 100644 --- a/src/core/lib/security/util/json_util.cc +++ b/src/core/lib/security/util/json_util.cc @@ -29,6 +29,10 @@ const char* grpc_json_get_string_property(const grpc_json* json, const char* prop_name) { grpc_json* child; for (child = json->child; child != nullptr; child = child->next) { + if (child->key == nullptr) { + gpr_log(GPR_ERROR, "Invalid (null) JSON key encountered"); + return nullptr; + } if (strcmp(child->key, prop_name) == 0) break; } if (child == nullptr || child->type != GRPC_JSON_STRING) { diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index fd56997388..1f1c08b159 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -333,14 +333,26 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n, size_t slice_len = GRPC_SLICE_LENGTH(slice); if (slice_len > n) { sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n); - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } return; } else if (slice_len == n) { - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } sb->count = idx; return; } else { - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } n -= slice_len; sb->count = idx; } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 86e0afa6ee..1cf8ea94e7 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -516,7 +516,6 @@ static void release_call(void* call, grpc_error* error) { grpc_call* c = static_cast<grpc_call*>(call); grpc_channel* channel = c->channel; grpc_call_combiner_destroy(&c->call_combiner); - gpr_free((char*)c->peer_string); grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(channel, "call"); } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 6b41e4b37e..039d603394 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -184,7 +184,8 @@ void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, nullptr) { transport->vtable->set_pollset_set(transport, stream, pollset_set); } else { - abort(); + // No-op for empty pollset. Empty pollset is possible when using + // non-fd-based event engines such as CFStream. } } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 10e9df0f7c..b2e252d939 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -168,13 +168,11 @@ struct grpc_transport_stream_op_batch_payload { /** Iff send_initial_metadata != NULL, flags associated with send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */ uint32_t send_initial_metadata_flags; - // If non-NULL, will be set by the transport to the peer string - // (a char*, which the caller takes ownership of). + // If non-NULL, will be set by the transport to the peer string (a char*). + // The transport retains ownership of the string. // Note: This pointer may be used by the transport after the // send_initial_metadata op is completed. It must remain valid // until the call is destroyed. - // Note: When a transport sets this, it must free the previous - // value, if any. gpr_atm* peer_string; } send_initial_metadata; @@ -202,13 +200,11 @@ struct grpc_transport_stream_op_batch_payload { // immediately available. This may be a signal that we received a // Trailers-Only response. bool* trailing_metadata_available; - // If non-NULL, will be set by the transport to the peer string - // (a char*, which the caller takes ownership of). + // If non-NULL, will be set by the transport to the peer string (a char*). + // The transport retains ownership of the string. // Note: This pointer may be used by the transport after the // recv_initial_metadata op is completed. It must remain valid // until the call is destroyed. - // Note: When a transport sets this, it must free the previous - // value, if any. gpr_atm* peer_string; } recv_initial_metadata; |