aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-06-11 16:33:15 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-06-11 16:33:15 -0700
commitdddf20793269a7ba5f658ae5fdcde2c175cf196b (patch)
tree595d0fbc0c501132857b63f00f073ec6a0f54f58 /src/core
parent935ae7d01242c21f0230ead18927087a70c03e5b (diff)
parent37b292a0f7500e434c9ae41919248e2641ddc7f1 (diff)
Merge branch 'master' into epollerr
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc520
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc4
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc12
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.cc16
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.h7
-rw-r--r--src/core/lib/iomgr/cfstream_handle.cc183
-rw-r--r--src/core/lib/iomgr/cfstream_handle.h80
-rw-r--r--src/core/lib/iomgr/endpoint_cfstream.cc372
-rw-r--r--src/core/lib/iomgr/endpoint_cfstream.h49
-rw-r--r--src/core/lib/iomgr/error.cc12
-rw-r--r--src/core/lib/iomgr/error.h5
-rw-r--r--src/core/lib/iomgr/error_cfstream.cc52
-rw-r--r--src/core/lib/iomgr/error_cfstream.h31
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc4
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc4
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_posix.cc4
-rw-r--r--src/core/lib/iomgr/iomgr_posix.cc4
-rw-r--r--src/core/lib/iomgr/polling_entity.cc13
-rw-r--r--src/core/lib/iomgr/port.h39
-rw-r--r--src/core/lib/iomgr/resolve_address.h2
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.cc2
-rw-r--r--src/core/lib/iomgr/sockaddr_posix.h2
-rw-r--r--src/core/lib/iomgr/socket_factory_posix.cc2
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc216
-rw-r--r--src/core/lib/iomgr/tcp_client_custom.cc6
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc24
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc4
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc4
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.cc16
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.h2
-rw-r--r--src/core/lib/security/util/json_util.cc4
-rw-r--r--src/core/lib/slice/slice_buffer.cc18
-rw-r--r--src/core/lib/surface/call.cc1
-rw-r--r--src/core/lib/transport/transport.cc3
-rw-r--r--src/core/lib/transport/transport.h12
39 files changed, 1422 insertions, 317 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 936d70b959..ea6775a8d8 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -891,6 +891,7 @@ typedef struct client_channel_call_data {
grpc_closure pick_cancel_closure;
grpc_polling_entity* pollent;
+ bool pollent_added_to_interested_parties;
// Batches are added to this list when received from above.
// They are removed when we are done handling the batch (i.e., when
@@ -949,7 +950,6 @@ static void retry_commit(grpc_call_element* elem,
static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
-static void pick_after_resolver_result_start_locked(grpc_call_element* elem);
static void start_pick_locked(void* arg, grpc_error* ignored);
//
@@ -2684,59 +2684,133 @@ static void pick_done(void* arg, grpc_error* error) {
}
}
+static void maybe_add_call_to_channel_interested_parties_locked(
+ grpc_call_element* elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (!calld->pollent_added_to_interested_parties) {
+ calld->pollent_added_to_interested_parties = true;
+ grpc_polling_entity_add_to_pollset_set(calld->pollent,
+ chand->interested_parties);
+ }
+}
+
+static void maybe_del_call_from_channel_interested_parties_locked(
+ grpc_call_element* elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (calld->pollent_added_to_interested_parties) {
+ calld->pollent_added_to_interested_parties = false;
+ grpc_polling_entity_del_from_pollset_set(calld->pollent,
+ chand->interested_parties);
+ }
+}
+
// Invoked when a pick is completed to leave the client_channel combiner
// and continue processing in the call combiner.
+// If needed, removes the call's polling entity from chand->interested_parties.
static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
call_data* calld = static_cast<call_data*>(elem->call_data);
+ maybe_del_call_from_channel_interested_parties_locked(elem);
GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
}
-// A wrapper around pick_done_locked() that is used in cases where
-// either (a) the pick was deferred pending a resolver result or (b) the
-// pick was done asynchronously. Removes the call's polling entity from
-// chand->interested_parties before invoking pick_done_locked().
-static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- grpc_polling_entity_del_from_pollset_set(calld->pollent,
- chand->interested_parties);
- pick_done_locked(elem, error);
-}
+namespace grpc_core {
-// Note: This runs under the client_channel combiner, but will NOT be
-// holding the call combiner.
-static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- // Note: chand->lb_policy may have changed since we started our pick,
- // in which case we will be cancelling the pick on a policy other than
- // the one we started it on. However, this will just be a no-op.
- if (GPR_LIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
+// Performs subchannel pick via LB policy.
+class LbPicker {
+ public:
+ // Starts a pick on chand->lb_policy.
+ static void StartLocked(grpc_call_element* elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
chand, calld, chand->lb_policy.get());
}
- chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
+ // If this is a retry, use the send_initial_metadata payload that
+ // we've cached; otherwise, use the pending batch. The
+ // send_initial_metadata batch will be the first pending batch in the
+ // list, as set by get_batch_index() above.
+ calld->pick.initial_metadata =
+ calld->seen_send_initial_metadata
+ ? &calld->send_initial_metadata
+ : calld->pending_batches[0]
+ .batch->payload->send_initial_metadata.send_initial_metadata;
+ calld->pick.initial_metadata_flags =
+ calld->seen_send_initial_metadata
+ ? calld->send_initial_metadata_flags
+ : calld->pending_batches[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
+ grpc_combiner_scheduler(chand->combiner));
+ calld->pick.on_complete = &calld->pick_closure;
+ GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
+ const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
+ if (GPR_LIKELY(pick_done)) {
+ // Pick completed synchronously.
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
+ chand, calld);
+ }
+ pick_done_locked(elem, GRPC_ERROR_NONE);
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
+ } else {
+ // Pick will be returned asynchronously.
+ // Add the polling entity from call_data to the channel_data's
+ // interested_parties, so that the I/O of the LB policy can be done
+ // under it. It will be removed in pick_done_locked().
+ maybe_add_call_to_channel_interested_parties_locked(elem);
+ // Request notification on call cancellation.
+ GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
+ grpc_call_combiner_set_notify_on_cancel(
+ calld->call_combiner,
+ GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
+ &LbPicker::CancelLocked, elem,
+ grpc_combiner_scheduler(chand->combiner)));
+ }
}
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
-}
-// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
-// Unrefs the LB policy and invokes async_pick_done_locked().
-static void pick_callback_done_locked(void* arg, grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", chand,
- calld);
+ private:
+ // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
+ // Unrefs the LB policy and invokes pick_done_locked().
+ static void DoneLocked(void* arg, grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
+ chand, calld);
+ }
+ pick_done_locked(elem, GRPC_ERROR_REF(error));
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
}
- async_pick_done_locked(elem, GRPC_ERROR_REF(error));
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
-}
+
+ // Note: This runs under the client_channel combiner, but will NOT be
+ // holding the call combiner.
+ static void CancelLocked(void* arg, grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ // Note: chand->lb_policy may have changed since we started our pick,
+ // in which case we will be cancelling the pick on a policy other than
+ // the one we started it on. However, this will just be a no-op.
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
+ calld, chand->lb_policy.get());
+ }
+ chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
+ }
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
+ }
+};
+
+} // namespace grpc_core
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
@@ -2766,6 +2840,24 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
grpc_deadline_state_reset(elem, calld->deadline);
}
}
+ // If the service config set wait_for_ready and the application
+ // did not explicitly set it, use the value from the service config.
+ uint32_t* send_initial_metadata_flags =
+ &calld->pending_batches[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ if (GPR_UNLIKELY(
+ calld->method_params->wait_for_ready() !=
+ ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
+ !(*send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
+ if (calld->method_params->wait_for_ready() ==
+ ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
+ *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ } else {
+ *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ }
+ }
}
}
// If no retry policy, disable retries.
@@ -2776,215 +2868,164 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
}
}
-// Starts a pick on chand->lb_policy.
-// Returns true if pick is completed synchronously.
-static bool pick_callback_start_locked(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+// Invoked once resolver results are available.
+static void process_service_config_and_start_lb_pick_locked(
+ grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand,
- calld, chand->lb_policy.get());
- }
// Only get service config data on the first attempt.
if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
apply_service_config_to_call_locked(elem);
}
- // If the application explicitly set wait_for_ready, use that.
- // Otherwise, if the service config specified a value for this
- // method, use that.
- //
- // The send_initial_metadata batch will be the first one in the list,
- // as set by get_batch_index() above.
- calld->pick.initial_metadata =
- calld->seen_send_initial_metadata
- ? &calld->send_initial_metadata
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata.send_initial_metadata;
- uint32_t send_initial_metadata_flags =
- calld->seen_send_initial_metadata
- ? calld->send_initial_metadata_flags
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
- const bool wait_for_ready_set_from_api =
- send_initial_metadata_flags &
- GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
- const bool wait_for_ready_set_from_service_config =
- calld->method_params != nullptr &&
- calld->method_params->wait_for_ready() !=
- ClientChannelMethodParams::WAIT_FOR_READY_UNSET;
- if (GPR_UNLIKELY(!wait_for_ready_set_from_api &&
- wait_for_ready_set_from_service_config)) {
- if (calld->method_params->wait_for_ready() ==
- ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
- send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
- } else {
- send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
- }
- }
- calld->pick.initial_metadata_flags = send_initial_metadata_flags;
- GRPC_CLOSURE_INIT(&calld->pick_closure, pick_callback_done_locked, elem,
- grpc_combiner_scheduler(chand->combiner));
- calld->pick.on_complete = &calld->pick_closure;
- GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
- const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
- if (GPR_LIKELY(pick_done)) {
- // Pick completed synchronously.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
- chand, calld);
- }
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
- } else {
- GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
- grpc_call_combiner_set_notify_on_cancel(
- calld->call_combiner,
- GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
- pick_callback_cancel_locked, elem,
- grpc_combiner_scheduler(chand->combiner)));
- }
- return pick_done;
+ // Start LB pick.
+ grpc_core::LbPicker::StartLocked(elem);
}
-typedef struct {
- grpc_call_element* elem;
- bool finished;
- grpc_closure closure;
- grpc_closure cancel_closure;
-} pick_after_resolver_result_args;
-
-// Note: This runs under the client_channel combiner, but will NOT be
-// holding the call combiner.
-static void pick_after_resolver_result_cancel_locked(void* arg,
- grpc_error* error) {
- pick_after_resolver_result_args* args =
- static_cast<pick_after_resolver_result_args*>(arg);
- if (GPR_LIKELY(args->finished)) {
- gpr_free(args);
- return;
- }
- // If we don't yet have a resolver result, then a closure for
- // pick_after_resolver_result_done_locked() will have been added to
- // chand->waiting_for_resolver_result_closures, and it may not be invoked
- // until after this call has been destroyed. We mark the operation as
- // finished, so that when pick_after_resolver_result_done_locked()
- // is called, it will be a no-op. We also immediately invoke
- // async_pick_done_locked() to propagate the error back to the caller.
- args->finished = true;
- grpc_call_element* elem = args->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: cancelling pick waiting for resolver result",
- chand, calld);
- }
- // Note: Although we are not in the call combiner here, we are
- // basically stealing the call combiner from the pending pick, so
- // it's safe to call async_pick_done_locked() here -- we are
- // essentially calling it here instead of calling it in
- // pick_after_resolver_result_done_locked().
- async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
-}
-
-static void pick_after_resolver_result_done_locked(void* arg,
- grpc_error* error) {
- pick_after_resolver_result_args* args =
- static_cast<pick_after_resolver_result_args*>(arg);
- if (GPR_UNLIKELY(args->finished)) {
- /* cancelled, do nothing */
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "call cancelled before resolver result");
- }
- gpr_free(args);
- return;
- }
- args->finished = true;
- grpc_call_element* elem = args->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
+namespace grpc_core {
+
+// Handles waiting for a resolver result.
+// Used only for the first call on an idle channel.
+class ResolverResultWaiter {
+ public:
+ explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: deferring pick pending resolver result",
chand, calld);
}
- async_pick_done_locked(elem, GRPC_ERROR_REF(error));
- } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
- // Shutting down.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
- calld);
+ // Add closure to be run when a resolver result is available.
+ GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
+ grpc_combiner_scheduler(chand->combiner));
+ AddToWaitingList();
+ // Set cancellation closure, so that we abort if the call is cancelled.
+ GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
+ this, grpc_combiner_scheduler(chand->combiner));
+ grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
+ &cancel_closure_);
+ }
+
+ private:
+ // Adds closure_ to chand->waiting_for_resolver_result_closures.
+ void AddToWaitingList() {
+ channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
+ grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
+ &done_closure_, GRPC_ERROR_NONE);
+ }
+
+ // Invoked when a resolver result is available.
+ static void DoneLocked(void* arg, grpc_error* error) {
+ ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
+ // If CancelLocked() has already run, delete ourselves without doing
+ // anything. Note that the call stack may have already been destroyed,
+ // so it's not safe to access anything in elem_.
+ if (GPR_UNLIKELY(self->finished_)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "call cancelled before resolver result");
+ }
+ Delete(self);
+ return;
}
- async_pick_done_locked(
- elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
- } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
- // Transient resolver failure.
- // If call has wait_for_ready=true, try again; otherwise, fail.
- uint32_t send_initial_metadata_flags =
- calld->seen_send_initial_metadata
- ? calld->send_initial_metadata_flags
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
- if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
+ // Otherwise, process the resolver result.
+ grpc_call_element* elem = self->elem_;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: resolver returned but no LB policy; "
- "wait_for_ready=true; trying again",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
chand, calld);
}
- pick_after_resolver_result_start_locked(elem);
+ pick_done_locked(elem, GRPC_ERROR_REF(error));
+ } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
+ // Shutting down.
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
+ calld);
+ }
+ pick_done_locked(elem,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
+ } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
+ // Transient resolver failure.
+ // If call has wait_for_ready=true, try again; otherwise, fail.
+ uint32_t send_initial_metadata_flags =
+ calld->seen_send_initial_metadata
+ ? calld->send_initial_metadata_flags
+ : calld->pending_batches[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: resolver returned but no LB policy; "
+ "wait_for_ready=true; trying again",
+ chand, calld);
+ }
+ // Re-add ourselves to the waiting list.
+ self->AddToWaitingList();
+ // Return early so that we don't set finished_ to true below.
+ return;
+ } else {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: resolver returned but no LB policy; "
+ "wait_for_ready=false; failing",
+ chand, calld);
+ }
+ pick_done_locked(
+ elem,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ }
} else {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: resolver returned but no LB policy; "
- "wait_for_ready=false; failing",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
chand, calld);
}
- async_pick_done_locked(
- elem,
- grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ process_service_config_and_start_lb_pick_locked(elem);
}
- } else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing pick",
- chand, calld);
+ self->finished_ = true;
+ }
+
+ // Invoked when the call is cancelled.
+ // Note: This runs under the client_channel combiner, but will NOT be
+ // holding the call combiner.
+ static void CancelLocked(void* arg, grpc_error* error) {
+ ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
+ // If DoneLocked() has already run, delete ourselves without doing anything.
+ if (GPR_LIKELY(self->finished_)) {
+ Delete(self);
+ return;
}
- if (GPR_LIKELY(pick_callback_start_locked(elem))) {
- // Even if the LB policy returns a result synchronously, we have
- // already added our polling entity to chand->interested_parties
- // in order to wait for the resolver result, so we need to
- // remove it here. Therefore, we call async_pick_done_locked()
- // instead of pick_done_locked().
- async_pick_done_locked(elem, GRPC_ERROR_NONE);
+ // If we are being cancelled, immediately invoke pick_done_locked()
+ // to propagate the error back to the caller.
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
+ grpc_call_element* elem = self->elem_;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: cancelling call waiting for name "
+ "resolution",
+ chand, calld);
+ }
+ // Note: Although we are not in the call combiner here, we are
+ // basically stealing the call combiner from the pending pick, so
+ // it's safe to call pick_done_locked() here -- we are essentially
+ // calling it here instead of calling it in DoneLocked().
+ pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick cancelled", &error, 1));
}
+ self->finished_ = true;
}
-}
-static void pick_after_resolver_result_start_locked(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: deferring pick pending resolver result", chand,
- calld);
- }
- pick_after_resolver_result_args* args =
- static_cast<pick_after_resolver_result_args*>(gpr_zalloc(sizeof(*args)));
- args->elem = elem;
- GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
- args, grpc_combiner_scheduler(chand->combiner));
- grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
- &args->closure, GRPC_ERROR_NONE);
- grpc_call_combiner_set_notify_on_cancel(
- calld->call_combiner,
- GRPC_CLOSURE_INIT(&args->cancel_closure,
- pick_after_resolver_result_cancel_locked, args,
- grpc_combiner_scheduler(chand->combiner)));
-}
+ grpc_call_element* elem_;
+ grpc_closure done_closure_;
+ grpc_closure cancel_closure_;
+ bool finished_ = false;
+};
+
+} // namespace grpc_core
static void start_pick_locked(void* arg, grpc_error* ignored) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
@@ -2993,31 +3034,24 @@ static void start_pick_locked(void* arg, grpc_error* ignored) {
GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
GPR_ASSERT(calld->subchannel_call == nullptr);
if (GPR_LIKELY(chand->lb_policy != nullptr)) {
- // We already have an LB policy, so ask it for a pick.
- if (GPR_LIKELY(pick_callback_start_locked(elem))) {
- // Pick completed synchronously.
- pick_done_locked(elem, GRPC_ERROR_NONE);
- return;
- }
+ // We already have resolver results, so process the service config
+ // and start an LB pick.
+ process_service_config_and_start_lb_pick_locked(elem);
+ } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
+ pick_done_locked(elem,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
} else {
// We do not yet have an LB policy, so wait for a resolver result.
- if (GPR_UNLIKELY(chand->resolver == nullptr)) {
- pick_done_locked(elem,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
- return;
- }
if (GPR_UNLIKELY(!chand->started_resolving)) {
start_resolving_locked(chand);
}
- pick_after_resolver_result_start_locked(elem);
+ // Create a new waiter, which will delete itself when done.
+ grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
+ // Add the polling entity from call_data to the channel_data's
+ // interested_parties, so that the I/O of the resolver can be done
+ // under it. It will be removed in pick_done_locked().
+ maybe_add_call_to_channel_interested_parties_locked(elem);
}
- // We need to wait for either a resolver result or for an async result
- // from the LB policy. Add the polling entity from call_data to the
- // channel_data's interested_parties, so that the I/O of the LB policy
- // and resolver can be done under it. The polling entity will be
- // removed in async_pick_done_locked().
- grpc_polling_entity_add_to_pollset_set(calld->pollent,
- chand->interested_parties);
}
//
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index 73d2af6e42..00a84302ed 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -18,7 +18,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
-#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET)
+#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
#include <ares.h>
#include <sys/ioctl.h>
@@ -348,4 +348,4 @@ void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) {
gpr_mu_unlock(&ev_driver->mu);
}
-#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) */
+#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index cc4a823798..a8090d18a6 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1451,10 +1451,8 @@ static void perform_stream_op_locked(void* stream_op,
}
}
if (op_payload->send_initial_metadata.peer_string != nullptr) {
- char* old_peer_string = (char*)gpr_atm_full_xchg(
- op_payload->send_initial_metadata.peer_string,
- (gpr_atm)gpr_strdup(t->peer_string));
- gpr_free(old_peer_string);
+ gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
+ (gpr_atm)t->peer_string);
}
}
@@ -1569,10 +1567,8 @@ static void perform_stream_op_locked(void* stream_op,
s->trailing_metadata_available =
op_payload->recv_initial_metadata.trailing_metadata_available;
if (op_payload->recv_initial_metadata.peer_string != nullptr) {
- char* old_peer_string = (char*)gpr_atm_full_xchg(
- op_payload->recv_initial_metadata.peer_string,
- (gpr_atm)gpr_strdup(t->peer_string));
- gpr_free(old_peer_string);
+ gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
+ (gpr_atm)t->peer_string);
}
grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
index d5ef063883..0eaf63f133 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
@@ -41,14 +41,18 @@
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/timeout_encoding.h"
-#define HASH_FRAGMENT_1(x) ((x)&255)
-#define HASH_FRAGMENT_2(x) ((x >> 8) & 255)
-#define HASH_FRAGMENT_3(x) ((x >> 16) & 255)
-#define HASH_FRAGMENT_4(x) ((x >> 24) & 255)
+#define HASH_FRAGMENT_MASK (GRPC_CHTTP2_HPACKC_NUM_VALUES - 1)
+#define HASH_FRAGMENT_1(x) ((x)&HASH_FRAGMENT_MASK)
+#define HASH_FRAGMENT_2(x) \
+ (((x) >> GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS) & HASH_FRAGMENT_MASK)
+#define HASH_FRAGMENT_3(x) \
+ (((x) >> (GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS * 2)) & HASH_FRAGMENT_MASK)
+#define HASH_FRAGMENT_4(x) \
+ (((x) >> (GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS * 3)) & HASH_FRAGMENT_MASK)
/* if the probability of this item being seen again is < 1/x then don't add
it to the table */
-#define ONE_ON_ADD_PROBABILITY 128
+#define ONE_ON_ADD_PROBABILITY (GRPC_CHTTP2_HPACKC_NUM_VALUES >> 1)
/* don't consider adding anything bigger than this to the hpack table */
#define MAX_DECODER_SPACE_USAGE 512
@@ -135,7 +139,7 @@ static void inc_filter(uint8_t idx, uint32_t* sum, uint8_t* elems) {
} else {
int i;
*sum = 0;
- for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_FILTERS; i++) {
+ for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_VALUES; i++) {
elems[i] /= 2;
(*sum) += elems[i];
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
index b370932131..e31a7399d7 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
@@ -28,8 +28,9 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
-#define GRPC_CHTTP2_HPACKC_NUM_FILTERS 256
-#define GRPC_CHTTP2_HPACKC_NUM_VALUES 256
+// This should be <= 8. We use 6 to save space.
+#define GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS 6
+#define GRPC_CHTTP2_HPACKC_NUM_VALUES (1 << GRPC_CHTTP2_HPACKC_NUM_VALUES_BITS)
/* initial table size, per spec */
#define GRPC_CHTTP2_HPACKC_INITIAL_TABLE_SIZE 4096
/* maximum table size we'll actually use */
@@ -58,7 +59,7 @@ typedef struct {
a new literal should be added to the compression table or not.
They track a single integer that counts how often a particular value has
been seen. When that count reaches max (255), all values are halved. */
- uint8_t filter_elems[GRPC_CHTTP2_HPACKC_NUM_FILTERS];
+ uint8_t filter_elems[GRPC_CHTTP2_HPACKC_NUM_VALUES];
/* entry tables for keys & elems: these tables track values that have been
seen and *may* be in the decompressor table */
diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc
new file mode 100644
index 0000000000..30f4e65632
--- /dev/null
+++ b/src/core/lib/iomgr/cfstream_handle.cc
@@ -0,0 +1,183 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM
+#import <CoreFoundation/CoreFoundation.h>
+#import "src/core/lib/iomgr/cfstream_handle.h"
+
+#include <grpc/support/atm.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+void* CFStreamHandle::Retain(void* info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
+ CFSTREAM_HANDLE_REF(handle, "retain");
+ return info;
+}
+
+void CFStreamHandle::Release(void* info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
+ CFSTREAM_HANDLE_UNREF(handle, "release");
+}
+
+CFStreamHandle* CFStreamHandle::CreateStreamHandle(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
+ return new CFStreamHandle(read_stream, write_stream);
+}
+
+void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
+ CFStreamEventType type,
+ void* client_callback_info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
+ CFSTREAM_HANDLE_REF(handle, "read callback");
+ dispatch_async(
+ dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
+ stream, type, client_callback_info);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ handle->open_event_.SetReady();
+ break;
+ case kCFStreamEventHasBytesAvailable:
+ case kCFStreamEventEndEncountered:
+ handle->read_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ handle->open_event_.SetReady();
+ handle->read_event_.SetReady();
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+ CFSTREAM_HANDLE_UNREF(handle, "read callback");
+ });
+}
+void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
+ CFStreamEventType type,
+ void* clientCallBackInfo) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
+ CFSTREAM_HANDLE_REF(handle, "write callback");
+ dispatch_async(
+ dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
+ stream, type, clientCallBackInfo);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ handle->open_event_.SetReady();
+ break;
+ case kCFStreamEventCanAcceptBytes:
+ case kCFStreamEventEndEncountered:
+ handle->write_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ handle->open_event_.SetReady();
+ handle->write_event_.SetReady();
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+ CFSTREAM_HANDLE_UNREF(handle, "write callback");
+ });
+}
+
+CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream) {
+ gpr_ref_init(&refcount_, 1);
+ open_event_.InitEvent();
+ read_event_.InitEvent();
+ write_event_.InitEvent();
+ CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil};
+ CFReadStreamSetClient(
+ read_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamHandle::ReadCallback, &ctx);
+ CFWriteStreamSetClient(
+ write_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamHandle::WriteCallback, &ctx);
+ CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+ CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+}
+
+CFStreamHandle::~CFStreamHandle() {
+ open_event_.DestroyEvent();
+ read_event_.DestroyEvent();
+ write_event_.DestroyEvent();
+}
+
+void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
+ open_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
+ read_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
+ write_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::Shutdown(grpc_error* error) {
+ open_event_.SetShutdown(GRPC_ERROR_REF(error));
+ read_event_.SetShutdown(GRPC_ERROR_REF(error));
+ write_event_.SetShutdown(GRPC_ERROR_REF(error));
+ GRPC_ERROR_UNREF(error);
+}
+
+void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
+ reason, val, val + 1);
+ }
+ gpr_ref(&refcount_);
+}
+
+void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
+ gpr_log(GPR_ERROR,
+ "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
+ reason, val, val - 1);
+ }
+ if (gpr_unref(&refcount_)) {
+ delete this;
+ }
+}
+
+#endif
diff --git a/src/core/lib/iomgr/cfstream_handle.h b/src/core/lib/iomgr/cfstream_handle.h
new file mode 100644
index 0000000000..4258e72431
--- /dev/null
+++ b/src/core/lib/iomgr/cfstream_handle.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* The CFStream handle acts as an event synchronization entity for
+ * read/write/open/error/eos events happening on CFStream streams. */
+
+#ifndef GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H
+#define GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM
+#import <CoreFoundation/CoreFoundation.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/lockfree_event.h"
+
+class CFStreamHandle final {
+ public:
+ static CFStreamHandle* CreateStreamHandle(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream);
+ ~CFStreamHandle();
+ CFStreamHandle(const CFReadStreamRef& ref) = delete;
+ CFStreamHandle(CFReadStreamRef&& ref) = delete;
+ CFStreamHandle& operator=(const CFStreamHandle& rhs) = delete;
+
+ void NotifyOnOpen(grpc_closure* closure);
+ void NotifyOnRead(grpc_closure* closure);
+ void NotifyOnWrite(grpc_closure* closure);
+ void Shutdown(grpc_error* error);
+
+ void Ref(const char* file = "", int line = 0, const char* reason = nullptr);
+ void Unref(const char* file = "", int line = 0, const char* reason = nullptr);
+
+ private:
+ CFStreamHandle(CFReadStreamRef read_stream, CFWriteStreamRef write_stream);
+ static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
+ void* client_callback_info);
+ static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type,
+ void* client_callback_info);
+ static void* Retain(void* info);
+ static void Release(void* info);
+
+ grpc_core::LockfreeEvent open_event_;
+ grpc_core::LockfreeEvent read_event_;
+ grpc_core::LockfreeEvent write_event_;
+
+ gpr_refcount refcount_;
+};
+
+#ifdef DEBUG
+#define CFSTREAM_HANDLE_REF(handle, reason) \
+ (handle)->Ref(__FILE__, __LINE__, (reason))
+#define CFSTREAM_HANDLE_UNREF(handle, reason) \
+ (handle)->Unref(__FILE__, __LINE__, (reason))
+#else
+#define CFSTREAM_HANDLE_REF(handle, reason) (handle)->Ref()
+#define CFSTREAM_HANDLE_UNREF(handle, reason) (handle)->Unref()
+#endif
+
+#endif
+
+#endif /* GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc
new file mode 100644
index 0000000000..c3bc0cc8fd
--- /dev/null
+++ b/src/core/lib/iomgr/endpoint_cfstream.cc
@@ -0,0 +1,372 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_ENDPOINT
+
+#import <CoreFoundation/CoreFoundation.h>
+#import "src/core/lib/iomgr/endpoint_cfstream.h"
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/error_cfstream.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+typedef struct {
+ grpc_endpoint base;
+ gpr_refcount refcount;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+ CFStreamHandle* stream_sync;
+
+ grpc_closure* read_cb;
+ grpc_closure* write_cb;
+ grpc_slice_buffer* read_slices;
+ grpc_slice_buffer* write_slices;
+
+ grpc_closure read_action;
+ grpc_closure write_action;
+
+ char* peer_string;
+ grpc_resource_user* resource_user;
+ grpc_resource_user_slice_allocator slice_allocator;
+} CFStreamEndpoint;
+
+static void CFStreamFree(CFStreamEndpoint* ep) {
+ grpc_resource_user_unref(ep->resource_user);
+ CFRelease(ep->read_stream);
+ CFRelease(ep->write_stream);
+ CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
+ gpr_free(ep->peer_string);
+ gpr_free(ep);
+}
+
+#ifndef NDEBUG
+#define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
+#define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
+static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
+ const char* file, int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
+ reason, val, val - 1);
+ }
+ if (gpr_unref(&ep->refcount)) {
+ CFStreamFree(ep);
+ }
+}
+static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
+ const char* file, int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
+ reason, val, val + 1);
+ }
+ gpr_ref(&ep->refcount);
+}
+#else
+#define EP_REF(ep, reason) CFStreamRef((ep))
+#define EP_UNREF(ep, reason) CFStreamUnref((ep))
+static void CFStreamUnref(CFStreamEndpoint* ep) {
+ if (gpr_unref(&ep->refcount)) {
+ CFStreamFree(ep);
+ }
+}
+static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
+#endif
+
+static grpc_error* CFStreamAnnotateError(grpc_error* src_error,
+ CFStreamEndpoint* ep) {
+ return grpc_error_set_str(
+ grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE),
+ GRPC_ERROR_STR_TARGET_ADDRESS,
+ grpc_slice_from_copied_string(ep->peer_string));
+}
+
+static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep,
+ ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
+ size_t i;
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+
+ for (i = 0; i < ep->read_slices->count; i++) {
+ char* dump = grpc_dump_slice(ep->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump);
+ gpr_free(dump);
+ }
+ }
+ grpc_closure* cb = ep->read_cb;
+ ep->read_cb = nullptr;
+ ep->read_slices = nullptr;
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep,
+ ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "write: error=%s", str);
+ }
+ grpc_closure* cb = ep->write_cb;
+ ep->write_cb = nullptr;
+ ep->write_slices = nullptr;
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void ReadAction(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ GPR_ASSERT(ep->read_cb != nullptr);
+ if (error) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep, GRPC_ERROR_REF(error));
+ EP_UNREF(ep, "read");
+ return;
+ }
+
+ GPR_ASSERT(ep->read_slices->count == 1);
+ grpc_slice slice = ep->read_slices->slices[0];
+ size_t len = GRPC_SLICE_LENGTH(slice);
+ CFIndex read_size =
+ CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
+ if (read_size == -1) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
+ if (stream_error != nullptr) {
+ error = CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep);
+ CFRelease(stream_error);
+ } else {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error");
+ }
+ CallReadCb(ep, error);
+ EP_UNREF(ep, "read");
+ } else if (read_size == 0) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep,
+ CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep));
+ EP_UNREF(ep, "read");
+ } else {
+ if (read_size < len) {
+ grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
+ }
+ CallReadCb(ep, GRPC_ERROR_NONE);
+ EP_UNREF(ep, "read");
+ }
+}
+
+static void WriteAction(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ GPR_ASSERT(ep->write_cb != nullptr);
+ if (error) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
+ CallWriteCb(ep, GRPC_ERROR_REF(error));
+ EP_UNREF(ep, "write");
+ return;
+ }
+
+ grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
+ size_t slice_len = GRPC_SLICE_LENGTH(slice);
+ CFIndex write_size = CFWriteStreamWrite(
+ ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
+ if (write_size == -1) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
+ CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
+ if (stream_error != nullptr) {
+ error = CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep);
+ CFRelease(stream_error);
+ } else {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed.");
+ }
+ CallWriteCb(ep, error);
+ EP_UNREF(ep, "write");
+ } else {
+ if (write_size < GRPC_SLICE_LENGTH(slice)) {
+ grpc_slice_buffer_undo_take_first(
+ ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
+ }
+ if (ep->write_slices->length > 0) {
+ ep->stream_sync->NotifyOnWrite(&ep->write_action);
+ } else {
+ CallWriteCb(ep, GRPC_ERROR_NONE);
+ EP_UNREF(ep, "write");
+ }
+
+ if (grpc_tcp_trace.enabled()) {
+ grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
+ char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump);
+ gpr_free(dump);
+ grpc_slice_unref_internal(trace_slice);
+ }
+ }
+ grpc_slice_unref_internal(slice);
+}
+
+static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ if (error == GRPC_ERROR_NONE) {
+ ep->stream_sync->NotifyOnRead(&ep->read_action);
+ } else {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep, error);
+ EP_UNREF(ep, "read");
+ }
+}
+
+static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
+ slices, cb, slices->length);
+ }
+ GPR_ASSERT(ep_impl->read_cb == nullptr);
+ ep_impl->read_cb = cb;
+ ep_impl->read_slices = slices;
+ grpc_slice_buffer_reset_and_unref_internal(slices);
+ grpc_resource_user_alloc_slices(&ep_impl->slice_allocator,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
+ ep_impl->read_slices);
+ EP_REF(ep_impl, "read");
+}
+
+static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
+ ep_impl, slices, cb, slices->length);
+ }
+ GPR_ASSERT(ep_impl->write_cb == nullptr);
+ ep_impl->write_cb = cb;
+ ep_impl->write_slices = slices;
+ EP_REF(ep_impl, "write");
+ ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
+}
+
+void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why);
+ }
+ CFReadStreamClose(ep_impl->read_stream);
+ CFWriteStreamClose(ep_impl->write_stream);
+ ep_impl->stream_sync->Shutdown(why);
+ grpc_resource_user_shutdown(ep_impl->resource_user);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why);
+ }
+}
+
+void CFStreamDestroy(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
+ }
+ EP_UNREF(ep_impl, "destroy");
+}
+
+grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ return ep_impl->resource_user;
+}
+
+char* CFStreamGetPeer(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ return gpr_strdup(ep_impl->peer_string);
+}
+
+int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
+
+void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
+void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
+void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep,
+ grpc_pollset_set* pollset) {}
+
+static const grpc_endpoint_vtable vtable = {CFStreamRead,
+ CFStreamWrite,
+ CFStreamAddToPollset,
+ CFStreamAddToPollsetSet,
+ CFStreamDeleteFromPollsetSet,
+ CFStreamShutdown,
+ CFStreamDestroy,
+ CFStreamGetResourceUser,
+ CFStreamGetPeer,
+ CFStreamGetFD};
+
+grpc_endpoint* grpc_cfstream_endpoint_create(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
+ const char* peer_string, grpc_resource_quota* resource_quota,
+ CFStreamHandle* stream_sync) {
+ CFStreamEndpoint* ep_impl =
+ static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint)));
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "CFStream endpoint:%p create readStream:%p writeStream: %p",
+ ep_impl, read_stream, write_stream);
+ }
+ ep_impl->base.vtable = &vtable;
+ gpr_ref_init(&ep_impl->refcount, 1);
+ ep_impl->read_stream = read_stream;
+ ep_impl->write_stream = write_stream;
+ CFRetain(read_stream);
+ CFRetain(write_stream);
+ ep_impl->stream_sync = stream_sync;
+ CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
+
+ ep_impl->peer_string = gpr_strdup(peer_string);
+ ep_impl->read_cb = nil;
+ ep_impl->write_cb = nil;
+ ep_impl->read_slices = nil;
+ ep_impl->write_slices = nil;
+ GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
+ static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
+ static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
+ ep_impl->resource_user =
+ grpc_resource_user_create(resource_quota, peer_string);
+ grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator,
+ ep_impl->resource_user,
+ CFStreamReadAllocationDone, ep_impl);
+
+ return &ep_impl->base;
+}
+
+#endif /* GRPC_CFSTREAM_ENDPOINT */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.h b/src/core/lib/iomgr/endpoint_cfstream.h
new file mode 100644
index 0000000000..ef957c1f11
--- /dev/null
+++ b/src/core/lib/iomgr/endpoint_cfstream.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H
+#define GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H
+/*
+ Low level TCP "bottom half" implementation, for use by transports built on
+ top of a TCP connection.
+
+ Note that this file does not (yet) include APIs for creating the socket in
+ the first place.
+
+ All calls passing slice transfer ownership of a slice refcount unless
+ otherwise specified.
+*/
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_CFSTREAM
+
+#import <CoreFoundation/CoreFoundation.h>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
+#include "src/core/lib/iomgr/endpoint.h"
+
+grpc_endpoint* grpc_cfstream_endpoint_create(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
+ const char* peer_string, grpc_resource_quota* resource_quota,
+ CFStreamHandle* stream_sync);
+
+#endif /* GRPC_CFSTREAM */
+
+#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H */
diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc
index 4088cf612e..90ed34da11 100644
--- a/src/core/lib/iomgr/error.cc
+++ b/src/core/lib/iomgr/error.cc
@@ -312,6 +312,12 @@ static void internal_add_error(grpc_error** err, grpc_error* new_err) {
// It is very common to include and extra int and string in an error
#define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME)
+static bool g_error_creation_allowed = true;
+
+void grpc_disable_error_creation() { g_error_creation_allowed = false; }
+
+void grpc_enable_error_creation() { g_error_creation_allowed = true; }
+
grpc_error* grpc_error_create(const char* file, int line, grpc_slice desc,
grpc_error** referencing,
size_t num_referencing) {
@@ -326,6 +332,12 @@ grpc_error* grpc_error_create(const char* file, int line, grpc_slice desc,
return GRPC_ERROR_OOM;
}
#ifndef NDEBUG
+ if (!g_error_creation_allowed) {
+ gpr_log(GPR_ERROR,
+ "Error creation occurred when error creation was disabled [%s:%d]",
+ file, line);
+ abort();
+ }
if (grpc_trace_error_refcount.enabled()) {
gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line);
}
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index f8cae4da82..27c4d22fd1 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -123,6 +123,11 @@ typedef enum {
#define GRPC_ERROR_OOM ((grpc_error*)2)
#define GRPC_ERROR_CANCELLED ((grpc_error*)4)
+// debug only toggles that allow for a sanity to check that ensures we will
+// never create any errors in the per-RPC hotpath.
+void grpc_disable_error_creation();
+void grpc_enable_error_creation();
+
const char* grpc_error_string(grpc_error* error);
/// Create an error - but use GRPC_ERROR_CREATE instead
diff --git a/src/core/lib/iomgr/error_cfstream.cc b/src/core/lib/iomgr/error_cfstream.cc
new file mode 100644
index 0000000000..d7af8c377f
--- /dev/null
+++ b/src/core/lib/iomgr/error_cfstream.cc
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_CFSTREAM
+#include <CoreFoundation/CoreFoundation.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/error.h"
+
+#define MAX_ERROR_DESCRIPTION 256
+
+grpc_error* grpc_error_create_from_cferror(const char* file, int line,
+ void* arg, const char* custom_desc) {
+ CFErrorRef error = static_cast<CFErrorRef>(arg);
+ char buf_domain[MAX_ERROR_DESCRIPTION];
+ char buf_desc[MAX_ERROR_DESCRIPTION];
+ char* error_msg;
+ CFErrorDomain domain = CFErrorGetDomain((error));
+ CFIndex code = CFErrorGetCode((error));
+ CFStringRef desc = CFErrorCopyDescription((error));
+ CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION,
+ kCFStringEncodingUTF8);
+ CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION,
+ kCFStringEncodingUTF8);
+ gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)",
+ custom_desc, buf_domain, code, buf_desc);
+ CFRelease(desc);
+ grpc_error* return_error = grpc_error_create(
+ file, line, grpc_slice_from_copied_string(error_msg), NULL, 0);
+ gpr_free(error_msg);
+ return return_error;
+}
+#endif /* GRPC_CFSTREAM */
diff --git a/src/core/lib/iomgr/error_cfstream.h b/src/core/lib/iomgr/error_cfstream.h
new file mode 100644
index 0000000000..06ab751329
--- /dev/null
+++ b/src/core/lib/iomgr/error_cfstream.h
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H
+#define GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H
+
+#ifdef GRPC_CFSTREAM
+// Create an error from Apple Core Foundation CFError object
+#define GRPC_ERROR_CREATE_FROM_CFERROR(error, desc) \
+ grpc_error_create_from_cferror(__FILE__, __LINE__, \
+ static_cast<void*>((error)), (desc))
+grpc_error* grpc_error_create_from_cferror(const char* file, int line,
+ void* arg, const char* desc);
+#endif /* GRPC_CFSTREAM */
+
+#endif /* GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 554522598c..98ab974057 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1264,12 +1264,12 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
}
#else /* defined(GRPC_LINUX_EPOLL) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
#include "src/core/lib/iomgr/ev_epoll1_linux.h"
/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
return nullptr;
}
-#endif /* defined(GRPC_POSIX_SOCKET) */
+#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
#endif /* !defined(GRPC_LINUX_EPOLL) */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index be400c6ee9..f22eb8ee87 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1586,7 +1586,7 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
}
#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLLEX)
#include "src/core/lib/iomgr/ev_epollex_linux.h"
/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
epoll_create1 is not available. Return NULL */
@@ -1594,6 +1594,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
return nullptr;
}
-#endif /* defined(GRPC_POSIX_SOCKET) */
+#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) */
#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 6c0f0c91f3..20512e31ac 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -1750,7 +1750,7 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
}
#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLLSIG)
#include "src/core/lib/iomgr/ev_epollsig_linux.h"
/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
epoll_create1 is not available. Return NULL */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 06378bba25..df6b0e1e89 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_EV_POLL
#include "src/core/lib/iomgr/ev_poll_posix.h"
@@ -1769,4 +1769,4 @@ const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request) {
return &vtable;
}
-#endif
+#endif /* GRPC_POSIX_SOCKET_EV_POLL */
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index efa91045d4..82b21df7ba 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_EV
#include "src/core/lib/iomgr/ev_posix.h"
@@ -343,4 +343,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
g_event_engine->pollset_set_del_fd(pollset_set, fd);
}
-#endif // GRPC_POSIX_SOCKET
+#endif // GRPC_POSIX_SOCKET_EV
diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc
index 66c9cb7ff7..ca7334c9a4 100644
--- a/src/core/lib/iomgr/iomgr_posix.cc
+++ b/src/core/lib/iomgr/iomgr_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_IOMGR
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
@@ -64,4 +64,4 @@ void grpc_set_default_iomgr_platform() {
grpc_set_iomgr_platform_vtable(&vtable);
}
-#endif /* GRPC_POSIX_SOCKET */
+#endif /* GRPC_POSIX_SOCKET_IOMGR */
diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc
index 9f164f65b0..dea07cae53 100644
--- a/src/core/lib/iomgr/polling_entity.cc
+++ b/src/core/lib/iomgr/polling_entity.cc
@@ -61,8 +61,11 @@ bool grpc_polling_entity_is_empty(const grpc_polling_entity* pollent) {
void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent,
grpc_pollset_set* pss_dst) {
if (pollent->tag == GRPC_POLLS_POLLSET) {
- GPR_ASSERT(pollent->pollent.pollset != nullptr);
- grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset);
+ // CFStream does not use file destriptors. When CFStream is used, the fd
+ // pollset is possible to be null.
+ if (pollent->pollent.pollset != nullptr) {
+ grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset);
+ }
} else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
GPR_ASSERT(pollent->pollent.pollset_set != nullptr);
grpc_pollset_set_add_pollset_set(pss_dst, pollent->pollent.pollset_set);
@@ -75,8 +78,14 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent,
void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent,
grpc_pollset_set* pss_dst) {
if (pollent->tag == GRPC_POLLS_POLLSET) {
+#ifdef GRPC_CFSTREAM
+ if (pollent->pollent.pollset != nullptr) {
+ grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset);
+ }
+#else
GPR_ASSERT(pollent->pollent.pollset != nullptr);
grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset);
+#endif
} else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
GPR_ASSERT(pollent->pollent.pollset_set != nullptr);
grpc_pollset_set_del_pollset_set(pss_dst, pollent->pollent.pollset_set);
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index a397012003..80d8e63cdd 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -97,7 +97,26 @@
#define GRPC_MSG_IOVLEN_TYPE int
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#ifdef GRPC_CFSTREAM
+#define GRPC_POSIX_SOCKET_IOMGR 1
+#define GRPC_CFSTREAM_ENDPOINT 1
+#define GRPC_CFSTREAM_CLIENT 1
+#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
+#define GRPC_POSIX_SOCKET_EV 1
+#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1
+#define GRPC_POSIX_SOCKET_EV_POLL 1
+#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1
+#define GRPC_POSIX_SOCKET_SOCKADDR 1
+#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1
+#define GRPC_POSIX_SOCKET_TCP 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1
+#define GRPC_POSIX_SOCKET_UTILS_COMMON 1
+#else
#define GRPC_POSIX_SOCKET 1
+#endif
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_SYSCONF 1
#define GRPC_POSIX_WAKEUP_FD 1
@@ -131,12 +150,30 @@
#endif
#if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \
- defined(GRPC_CUSTOM_SOCKET) != \
+ defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_CFSTREAM) != \
1
#error \
"Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET"
#endif
+#ifdef GRPC_POSIX_SOCKET
+#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
+#define GRPC_POSIX_SOCKET_EV 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1
+#define GRPC_POSIX_SOCKET_EV_POLL 1
+#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
+#define GRPC_POSIX_SOCKET_IOMGR 1
+#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1
+#define GRPC_POSIX_SOCKET_SOCKADDR 1
+#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1
+#define GRPC_POSIX_SOCKET_TCP 1
+#define GRPC_POSIX_SOCKET_TCP_CLIENT 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1
+#define GRPC_POSIX_SOCKET_UTILS_COMMON 1
+#endif
+
#if defined(GRPC_POSIX_HOST_NAME_MAX) && defined(GRPC_POSIX_SYSCONF)
#error "Cannot define both GRPC_POSIX_HOST_NAME_MAX and GRPC_POSIX_SYSCONF"
#endif
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index fe0d834582..6afe94a7a9 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -33,7 +33,7 @@
#include <ws2tcpip.h>
#endif
-#ifdef GRPC_POSIX_SOCKET
+#if defined(GRPC_POSIX_SOCKET) || defined(GRPC_CFSTREAM)
#include <sys/socket.h>
#endif
diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc
index a82075542f..7a825643e1 100644
--- a/src/core/lib/iomgr/resolve_address_posix.cc
+++ b/src/core/lib/iomgr/resolve_address_posix.cc
@@ -19,7 +19,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
#include "src/core/lib/iomgr/sockaddr.h"
diff --git a/src/core/lib/iomgr/sockaddr_posix.h b/src/core/lib/iomgr/sockaddr_posix.h
index 5b18bbc465..3cedd9082d 100644
--- a/src/core/lib/iomgr/sockaddr_posix.h
+++ b/src/core/lib/iomgr/sockaddr_posix.h
@@ -23,7 +23,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_SOCKADDR
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
diff --git a/src/core/lib/iomgr/socket_factory_posix.cc b/src/core/lib/iomgr/socket_factory_posix.cc
index 1d1e36c0e3..57137769c8 100644
--- a/src/core/lib/iomgr/socket_factory_posix.cc
+++ b/src/core/lib/iomgr/socket_factory_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_SOCKET_FACTORY
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc
index 04a1767731..caee652307 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.cc
+++ b/src/core/lib/iomgr/socket_utils_common_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
new file mode 100644
index 0000000000..ffed3bbef6
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -0,0 +1,216 @@
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_CLIENT
+
+#include <CoreFoundation/CoreFoundation.h>
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+
+#include <netinet/in.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/host_port.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint_cfstream.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/error_cfstream.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/timer.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+typedef struct CFStreamConnect {
+ gpr_mu mu;
+ gpr_refcount refcount;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+ CFStreamHandle* stream_sync;
+
+ grpc_timer alarm;
+ grpc_closure on_alarm;
+ grpc_closure on_open;
+
+ bool read_stream_open;
+ bool write_stream_open;
+ bool failed;
+
+ grpc_closure* closure;
+ grpc_endpoint** endpoint;
+ int refs;
+ char* addr_name;
+ grpc_resource_quota* resource_quota;
+} CFStreamConnect;
+
+static void CFStreamConnectCleanup(CFStreamConnect* connect) {
+ grpc_resource_quota_unref_internal(connect->resource_quota);
+ CFSTREAM_HANDLE_UNREF(connect->stream_sync, "async connect clean up");
+ CFRelease(connect->read_stream);
+ CFRelease(connect->write_stream);
+ gpr_mu_destroy(&connect->mu);
+ gpr_free(connect->addr_name);
+ gpr_free(connect);
+}
+
+static void OnAlarm(void* arg, grpc_error* error) {
+ CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error);
+ }
+ gpr_mu_lock(&connect->mu);
+ grpc_closure* closure = connect->closure;
+ connect->closure = nil;
+ const bool done = (--connect->refs == 0);
+ gpr_mu_unlock(&connect->mu);
+ // Only schedule a callback once, by either OnAlarm or OnOpen. The
+ // first one issues callback while the second one does cleanup.
+ if (done) {
+ CFStreamConnectCleanup(connect);
+ } else {
+ grpc_error* error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
+ GRPC_CLOSURE_SCHED(closure, error);
+ }
+}
+
+static void OnOpen(void* arg, grpc_error* error) {
+ CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error);
+ }
+ gpr_mu_lock(&connect->mu);
+ grpc_timer_cancel(&connect->alarm);
+ grpc_closure* closure = connect->closure;
+ connect->closure = nil;
+
+ bool done = (--connect->refs == 0);
+ grpc_endpoint** endpoint = connect->endpoint;
+
+ // Only schedule a callback once, by either OnAlarm or OnOpen. The
+ // first one issues callback while the second one does cleanup.
+ if (done) {
+ gpr_mu_unlock(&connect->mu);
+ CFStreamConnectCleanup(connect);
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
+ if (stream_error == NULL) {
+ stream_error = CFWriteStreamCopyError(connect->write_stream);
+ }
+ if (stream_error) {
+ error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error");
+ CFRelease(stream_error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ *endpoint = grpc_cfstream_endpoint_create(
+ connect->read_stream, connect->write_stream, connect->addr_name,
+ connect->resource_quota, connect->stream_sync);
+ }
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ gpr_mu_unlock(&connect->mu);
+ GRPC_CLOSURE_SCHED(closure, error);
+ }
+}
+
+static void ParseResolvedAddress(const grpc_resolved_address* addr,
+ CFStringRef* host, int* port) {
+ char *host_port, *host_string, *port_string;
+ grpc_sockaddr_to_string(&host_port, addr, 1);
+ gpr_split_host_port(host_port, &host_string, &port_string);
+ *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8);
+ gpr_free(host_string);
+ gpr_free(port_string);
+ gpr_free(host_port);
+ *port = grpc_sockaddr_get_port(addr);
+}
+
+static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* resolved_addr,
+ grpc_millis deadline) {
+ CFStreamConnect* connect;
+
+ connect = (CFStreamConnect*)gpr_zalloc(sizeof(CFStreamConnect));
+ connect->closure = closure;
+ connect->endpoint = ep;
+ connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
+ // connect->resource_quota = resource_quota;
+ connect->refs = 2; // One for the connect operation, one for the timer.
+ gpr_ref_init(&connect->refcount, 1);
+ gpr_mu_init(&connect->mu);
+
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
+ connect->addr_name);
+ }
+
+ grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_unref_internal(resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
+ (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+ connect->resource_quota = resource_quota;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+
+ CFStringRef host;
+ int port;
+ ParseResolvedAddress(resolved_addr, &host, &port);
+ CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream,
+ &write_stream);
+ CFRelease(host);
+ connect->read_stream = read_stream;
+ connect->write_stream = write_stream;
+ connect->stream_sync =
+ CFStreamHandle::CreateStreamHandle(read_stream, write_stream);
+ GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
+ grpc_schedule_on_exec_ctx);
+ connect->stream_sync->NotifyOnOpen(&connect->on_open);
+ GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
+ grpc_schedule_on_exec_ctx);
+ gpr_mu_lock(&connect->mu);
+ CFReadStreamOpen(read_stream);
+ CFWriteStreamOpen(write_stream);
+ grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
+ gpr_mu_unlock(&connect->mu);
+}
+
+grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {CFStreamClientConnect};
+
+#endif /* GRPC_CFSTREAM_CLIENT */
diff --git a/src/core/lib/iomgr/tcp_client_custom.cc b/src/core/lib/iomgr/tcp_client_custom.cc
index 932c79ea0b..9389861d07 100644
--- a/src/core/lib/iomgr/tcp_client_custom.cc
+++ b/src/core/lib/iomgr/tcp_client_custom.cc
@@ -140,12 +140,12 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
socket, connect->addr_name);
}
- grpc_custom_socket_vtable->connect(
- socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len,
- custom_connect_callback);
GRPC_CLOSURE_INIT(&connect->on_alarm, on_alarm, socket,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
+ grpc_custom_socket_vtable->connect(
+ socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len,
+ custom_connect_callback);
}
grpc_tcp_client_vtable custom_tcp_client_vtable = {tcp_connect};
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 1b43258a91..71e08f1230 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
#include "src/core/lib/iomgr/tcp_client_posix.h"
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index b79ffe20f1..43d545846d 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/tcp_posix.h"
@@ -70,7 +70,9 @@ struct grpc_tcp {
grpc_endpoint base;
grpc_fd* em_fd;
int fd;
- bool finished_edge;
+ /* Used by the endpoint read function to distinguish the very first read call
+ * from the rest */
+ bool is_first_read;
double target_length;
double bytes_read_this_round;
gpr_refcount refcount;
@@ -377,7 +379,6 @@ static void tcp_do_read(grpc_tcp* tcp) {
ssize_t read_bytes;
size_t i;
- GPR_ASSERT(!tcp->finished_edge);
GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
for (i = 0; i < tcp->incoming_buffer->count; i++) {
@@ -473,7 +474,6 @@ static void tcp_continue_read(grpc_tcp* tcp) {
static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) {
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
- GPR_ASSERT(!tcp->finished_edge);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_string(error));
}
@@ -497,10 +497,17 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
grpc_slice_buffer_reset_and_unref_internal(incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
- if (tcp->finished_edge) {
- tcp->finished_edge = false;
+ if (tcp->is_first_read) {
+ /* Endpoint read called for the very first time. Register read callback with
+ * the polling engine */
+ tcp->is_first_read = false;
notify_on_read(tcp);
} else {
+ /* Not the first time. We may or may not have more bytes available. In any
+ * case call tcp->read_done_closure (i.e tcp_handle_read()) which does the
+ * right thing (i.e calls tcp_do_read() which either reads the available
+ * bytes or calls notify_on_read() to be notified when new bytes become
+ * available */
GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE);
}
}
@@ -778,7 +785,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->min_read_chunk_size = tcp_min_read_chunk_size;
tcp->max_read_chunk_size = tcp_max_read_chunk_size;
tcp->bytes_read_this_round = 0;
- tcp->finished_edge = true;
+ /* Will be set to false by the very first endpoint read function */
+ tcp->is_first_read = true;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
@@ -811,4 +819,4 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
TCP_UNREF(tcp, "destroy");
}
-#endif
+#endif /* GRPC_POSIX_SOCKET_TCP */
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index b39a0754ba..ce18ff99e6 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -25,7 +25,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
#include "src/core/lib/iomgr/tcp_server.h"
@@ -559,4 +559,4 @@ grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
tcp_server_shutdown_starting_add,
tcp_server_unref,
tcp_server_shutdown_listeners};
-#endif
+#endif /* GRPC_POSIX_SOCKET_TCP_SERVER */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index f21f4ad015..b9f8145572 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
@@ -217,4 +217,4 @@ error:
return ret;
}
-#endif /* GRPC_POSIX_SOCKET */
+#endif /* GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON */
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
index 2129029737..43dd68e874 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
@@ -219,9 +219,11 @@ static void on_oauth2_token_fetcher_http_response(void* user_data,
gpr_mu_lock(&c->mu);
c->token_fetch_pending = false;
c->access_token_md = GRPC_MDELEM_REF(access_token_md);
- c->token_expiration = status == GRPC_CREDENTIALS_OK
- ? grpc_core::ExecCtx::Get()->Now() + token_lifetime
- : 0;
+ c->token_expiration =
+ status == GRPC_CREDENTIALS_OK
+ ? gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(token_lifetime, GPR_TIMESPAN))
+ : gpr_inf_past(GPR_CLOCK_MONOTONIC);
grpc_oauth2_pending_get_request_metadata* pending_request =
c->pending_requests;
c->pending_requests = nullptr;
@@ -259,8 +261,10 @@ static bool oauth2_token_fetcher_get_request_metadata(
grpc_mdelem cached_access_token_md = GRPC_MDNULL;
gpr_mu_lock(&c->mu);
if (!GRPC_MDISNULL(c->access_token_md) &&
- (c->token_expiration - grpc_core::ExecCtx::Get()->Now() >
- refresh_threshold)) {
+ gpr_time_cmp(
+ gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_MONOTONIC)),
+ gpr_time_from_seconds(GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS,
+ GPR_TIMESPAN)) > 0) {
cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md);
}
if (!GRPC_MDISNULL(cached_access_token_md)) {
@@ -333,7 +337,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials* c,
c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2;
gpr_ref_init(&c->base.refcount, 1);
gpr_mu_init(&c->mu);
- c->token_expiration = 0;
+ c->token_expiration = gpr_inf_past(GPR_CLOCK_MONOTONIC);
c->fetch_func = fetch_func;
c->pollent =
grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create());
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
index c0dd1546e3..12a1d4484f 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
@@ -71,7 +71,7 @@ typedef struct {
grpc_call_credentials base;
gpr_mu mu;
grpc_mdelem access_token_md;
- grpc_millis token_expiration;
+ gpr_timespec token_expiration;
bool token_fetch_pending;
grpc_oauth2_pending_get_request_metadata* pending_requests;
grpc_httpcli_context httpcli_context;
diff --git a/src/core/lib/security/util/json_util.cc b/src/core/lib/security/util/json_util.cc
index 75512a19c9..fe9f5fe3d3 100644
--- a/src/core/lib/security/util/json_util.cc
+++ b/src/core/lib/security/util/json_util.cc
@@ -29,6 +29,10 @@ const char* grpc_json_get_string_property(const grpc_json* json,
const char* prop_name) {
grpc_json* child;
for (child = json->child; child != nullptr; child = child->next) {
+ if (child->key == nullptr) {
+ gpr_log(GPR_ERROR, "Invalid (null) JSON key encountered");
+ return nullptr;
+ }
if (strcmp(child->key, prop_name) == 0) break;
}
if (child == nullptr || child->type != GRPC_JSON_STRING) {
diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc
index fd56997388..1f1c08b159 100644
--- a/src/core/lib/slice/slice_buffer.cc
+++ b/src/core/lib/slice/slice_buffer.cc
@@ -333,14 +333,26 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n,
size_t slice_len = GRPC_SLICE_LENGTH(slice);
if (slice_len > n) {
sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n);
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
return;
} else if (slice_len == n) {
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
sb->count = idx;
return;
} else {
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
n -= slice_len;
sb->count = idx;
}
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 86e0afa6ee..1cf8ea94e7 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -516,7 +516,6 @@ static void release_call(void* call, grpc_error* error) {
grpc_call* c = static_cast<grpc_call*>(call);
grpc_channel* channel = c->channel;
grpc_call_combiner_destroy(&c->call_combiner);
- gpr_free((char*)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
}
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index 6b41e4b37e..039d603394 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -184,7 +184,8 @@ void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
nullptr) {
transport->vtable->set_pollset_set(transport, stream, pollset_set);
} else {
- abort();
+ // No-op for empty pollset. Empty pollset is possible when using
+ // non-fd-based event engines such as CFStream.
}
}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 10e9df0f7c..b2e252d939 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -168,13 +168,11 @@ struct grpc_transport_stream_op_batch_payload {
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags;
- // If non-NULL, will be set by the transport to the peer string
- // (a char*, which the caller takes ownership of).
+ // If non-NULL, will be set by the transport to the peer string (a char*).
+ // The transport retains ownership of the string.
// Note: This pointer may be used by the transport after the
// send_initial_metadata op is completed. It must remain valid
// until the call is destroyed.
- // Note: When a transport sets this, it must free the previous
- // value, if any.
gpr_atm* peer_string;
} send_initial_metadata;
@@ -202,13 +200,11 @@ struct grpc_transport_stream_op_batch_payload {
// immediately available. This may be a signal that we received a
// Trailers-Only response.
bool* trailing_metadata_available;
- // If non-NULL, will be set by the transport to the peer string
- // (a char*, which the caller takes ownership of).
+ // If non-NULL, will be set by the transport to the peer string (a char*).
+ // The transport retains ownership of the string.
// Note: This pointer may be used by the transport after the
// recv_initial_metadata op is completed. It must remain valid
// until the call is destroyed.
- // Note: When a transport sets this, it must free the previous
- // value, if any.
gpr_atm* peer_string;
} recv_initial_metadata;