diff options
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r-- | src/core/channel/client_channel.c | 1031 |
1 files changed, 482 insertions, 549 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 97f097d301..ce03ad5eb0 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -51,8 +51,7 @@ typedef struct call_data call_data; -typedef struct -{ +typedef struct { /** metadata context for this channel */ grpc_mdctx *mdctx; /** resolver for this channel */ @@ -90,16 +89,14 @@ typedef struct to watch for state changes from the lb_policy. When a state change is seen, we update the channel, and create a new watcher */ -typedef struct -{ +typedef struct { channel_data *chand; grpc_closure on_changed; grpc_connectivity_state state; grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; -typedef enum -{ +typedef enum { CALL_CREATED, CALL_WAITING_FOR_SEND, CALL_WAITING_FOR_CONFIG, @@ -109,8 +106,7 @@ typedef enum CALL_CANCELLED } call_state; -struct call_data -{ +struct call_data { /* owning element */ grpc_call_element *elem; @@ -127,406 +123,361 @@ struct call_data grpc_linked_mdelem details; }; -static grpc_closure * -merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op) - GRPC_MUST_USE_RESULT; +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) + GRPC_MUST_USE_RESULT; - static void handle_op_after_cancellation (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ +static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (op->send_ops) - { - grpc_stream_ops_unref_owned_objects (op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb (exec_ctx, op->on_done_send->cb_arg, 0); - } - if (op->recv_ops) - { - char status[GPR_LTOA_MIN_BUFSIZE]; - grpc_metadata_batch mdb; - gpr_ltoa (GRPC_STATUS_CANCELLED, status); - calld->status.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-status", status); - calld->details.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-message", "Cancelled"); - calld->status.prev = calld->details.next = NULL; - calld->status.next = &calld->details; - calld->details.prev = &calld->status; - mdb.list.head = &calld->status; - mdb.list.tail = &calld->details; - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future (GPR_CLOCK_REALTIME); - grpc_sopb_add_metadata (op->recv_ops, mdb); - *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb (exec_ctx, op->on_done_recv->cb_arg, 1); - } - if (op->on_consumed) - { - op->on_consumed->cb (exec_ctx, op->on_consumed->cb_arg, 0); - } + if (op->send_ops) { + grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); + op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0); + } + if (op->recv_ops) { + char status[GPR_LTOA_MIN_BUFSIZE]; + grpc_metadata_batch mdb; + gpr_ltoa(GRPC_STATUS_CANCELLED, status); + calld->status.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); + calld->details.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb.list.head = &calld->status; + mdb.list.tail = &calld->details; + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_sopb_add_metadata(op->recv_ops, mdb); + *op->recv_state = GRPC_STREAM_CLOSED; + op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1); + } + if (op->on_consumed) { + op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0); + } } -typedef struct -{ +typedef struct { grpc_closure closure; grpc_call_element *elem; } waiting_call; -static void perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op, int continuation); +static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op, + int continuation); -static void -continue_with_pick (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { waiting_call *wc = arg; call_data *calld = wc->elem->call_data; - perform_transport_stream_op (exec_ctx, wc->elem, &calld->waiting_op, 1); - gpr_free (wc); + perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1); + gpr_free(wc); } -static void -add_to_lb_policy_wait_queue_locked_state_config (grpc_call_element * elem) -{ +static void add_to_lb_policy_wait_queue_locked_state_config( + grpc_call_element *elem) { channel_data *chand = elem->channel_data; - waiting_call *wc = gpr_malloc (sizeof (*wc)); - grpc_closure_init (&wc->closure, continue_with_pick, wc); + waiting_call *wc = gpr_malloc(sizeof(*wc)); + grpc_closure_init(&wc->closure, continue_with_pick, wc); wc->elem = elem; - grpc_closure_list_add (&chand->waiting_for_config_closures, &wc->closure, 1); + grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1); } -static int -is_empty (void *p, int len) -{ +static int is_empty(void *p, int len) { char *ptr = p; int i; - for (i = 0; i < len; i++) - { - if (ptr[i] != 0) - return 0; - } + for (i = 0; i < len; i++) { + if (ptr[i] != 0) return 0; + } return 1; } -static void -started_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void started_call(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_transport_stream_op op; int have_waiting; - gpr_mu_lock (&calld->mu_state); - if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) - { - memset (&op, 0, sizeof (op)); - op.cancel_with_status = GRPC_STATUS_CANCELLED; - gpr_mu_unlock (&calld->mu_state); - grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &op); - } - else if (calld->state == CALL_WAITING_FOR_CALL) - { - have_waiting = !is_empty (&calld->waiting_op, sizeof (calld->waiting_op)); - if (calld->subchannel_call != NULL) - { - calld->state = CALL_ACTIVE; - gpr_mu_unlock (&calld->mu_state); - if (have_waiting) - { - grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &calld->waiting_op); - } - } - else - { - calld->state = CALL_CANCELLED; - gpr_mu_unlock (&calld->mu_state); - if (have_waiting) - { - handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op); - } - } - } - else - { - GPR_ASSERT (calld->state == CALL_CANCELLED); - gpr_mu_unlock (&calld->mu_state); - } + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op); + } else if (calld->state == CALL_WAITING_FOR_CALL) { + have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); + if (calld->subchannel_call != NULL) { + calld->state = CALL_ACTIVE; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, + &calld->waiting_op); + } + } else { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); + } + } + } else { + GPR_ASSERT(calld->state == CALL_CANCELLED); + gpr_mu_unlock(&calld->mu_state); + } } -static void -picked_target (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; - if (calld->picked_channel == NULL) - { - /* treat this like a cancellation */ - calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; - perform_transport_stream_op (exec_ctx, calld->elem, &calld->waiting_op, 1); - } - else - { - gpr_mu_lock (&calld->mu_state); - if (calld->state == CALL_CANCELLED) - { - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op); - } - else - { - GPR_ASSERT (calld->state == CALL_WAITING_FOR_PICK); - calld->state = CALL_WAITING_FOR_CALL; - pollset = calld->waiting_op.bind_pollset; - gpr_mu_unlock (&calld->mu_state); - grpc_closure_init (&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call (exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task); - } - } + if (calld->picked_channel == NULL) { + /* treat this like a cancellation */ + calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; + perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1); + } else { + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_CANCELLED) { + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); + } else { + GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); + calld->state = CALL_WAITING_FOR_CALL; + pollset = calld->waiting_op.bind_pollset; + gpr_mu_unlock(&calld->mu_state); + grpc_closure_init(&calld->async_setup_task, started_call, calld); + grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset, + &calld->subchannel_call, + &calld->async_setup_task); + } + } } -static grpc_closure * -merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op) -{ +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) { call_data *calld = elem->call_data; grpc_closure *consumed_op = NULL; grpc_transport_stream_op *waiting_op = &calld->waiting_op; - GPR_ASSERT ((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); - GPR_ASSERT ((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); - if (new_op->send_ops != NULL) - { - waiting_op->send_ops = new_op->send_ops; - waiting_op->is_last_send = new_op->is_last_send; - waiting_op->on_done_send = new_op->on_done_send; - } - if (new_op->recv_ops != NULL) - { - waiting_op->recv_ops = new_op->recv_ops; - waiting_op->recv_state = new_op->recv_state; - waiting_op->on_done_recv = new_op->on_done_recv; - } - if (new_op->on_consumed != NULL) - { - if (waiting_op->on_consumed != NULL) - { - consumed_op = waiting_op->on_consumed; - } - waiting_op->on_consumed = new_op->on_consumed; - } - if (new_op->cancel_with_status != GRPC_STATUS_OK) - { - waiting_op->cancel_with_status = new_op->cancel_with_status; - } + GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); + GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); + if (new_op->send_ops != NULL) { + waiting_op->send_ops = new_op->send_ops; + waiting_op->is_last_send = new_op->is_last_send; + waiting_op->on_done_send = new_op->on_done_send; + } + if (new_op->recv_ops != NULL) { + waiting_op->recv_ops = new_op->recv_ops; + waiting_op->recv_state = new_op->recv_state; + waiting_op->on_done_recv = new_op->on_done_recv; + } + if (new_op->on_consumed != NULL) { + if (waiting_op->on_consumed != NULL) { + consumed_op = waiting_op->on_consumed; + } + waiting_op->on_consumed = new_op->on_consumed; + } + if (new_op->cancel_with_status != GRPC_STATUS_OK) { + waiting_op->cancel_with_status = new_op->cancel_with_status; + } return consumed_op; } -static char * -cc_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ +static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; char *result; - gpr_mu_lock (&calld->mu_state); - if (calld->state == CALL_ACTIVE) - { - subchannel_call = calld->subchannel_call; - GRPC_SUBCHANNEL_CALL_REF (subchannel_call, "get_peer"); - gpr_mu_unlock (&calld->mu_state); - result = grpc_subchannel_call_get_peer (exec_ctx, subchannel_call); - GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "get_peer"); - return result; - } - else - { - gpr_mu_unlock (&calld->mu_state); - return grpc_channel_get_target (chand->master); - } + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_ACTIVE) { + subchannel_call = calld->subchannel_call; + GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); + gpr_mu_unlock(&calld->mu_state); + result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer"); + return result; + } else { + gpr_mu_unlock(&calld->mu_state); + return grpc_channel_get_target(chand->master); + } } -static void -perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op, int continuation) -{ +static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op, + int continuation) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; - GPR_ASSERT (elem->filter == &grpc_client_channel_filter); - GRPC_CALL_LOG_OP (GPR_INFO, elem, op); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - gpr_mu_lock (&calld->mu_state); - switch (calld->state) - { + gpr_mu_lock(&calld->mu_state); + switch (calld->state) { case CALL_ACTIVE: - GPR_ASSERT (!continuation); + GPR_ASSERT(!continuation); subchannel_call = calld->subchannel_call; - gpr_mu_unlock (&calld->mu_state); - grpc_subchannel_call_process_op (exec_ctx, subchannel_call, op); + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op); break; case CALL_CANCELLED: - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, elem, op); + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); break; case CALL_WAITING_FOR_SEND: - GPR_ASSERT (!continuation); - grpc_exec_ctx_enqueue (exec_ctx, merge_into_waiting_op (elem, op), 1); - if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) - { - gpr_mu_unlock (&calld->mu_state); - break; - } + GPR_ASSERT(!continuation); + grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); + if (!calld->waiting_op.send_ops && + calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { + gpr_mu_unlock(&calld->mu_state); + break; + } *op = calld->waiting_op; - memset (&calld->waiting_op, 0, sizeof (calld->waiting_op)); + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); continuation = 1; - /* fall through */ + /* fall through */ case CALL_WAITING_FOR_CONFIG: case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CALL: - if (!continuation) - { - if (op->cancel_with_status != GRPC_STATUS_OK) - { - calld->state = CALL_CANCELLED; - op2 = calld->waiting_op; - memset (&calld->waiting_op, 0, sizeof (calld->waiting_op)); - if (op->on_consumed) - { - calld->waiting_op.on_consumed = op->on_consumed; - op->on_consumed = NULL; - } - else if (op2.on_consumed) - { - calld->waiting_op.on_consumed = op2.on_consumed; - op2.on_consumed = NULL; - } - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, elem, op); - handle_op_after_cancellation (exec_ctx, elem, &op2); - } - else - { - grpc_exec_ctx_enqueue (exec_ctx, merge_into_waiting_op (elem, op), 1); - gpr_mu_unlock (&calld->mu_state); - } - break; - } - /* fall through */ + if (!continuation) { + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + op2 = calld->waiting_op; + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); + if (op->on_consumed) { + calld->waiting_op.on_consumed = op->on_consumed; + op->on_consumed = NULL; + } else if (op2.on_consumed) { + calld->waiting_op.on_consumed = op2.on_consumed; + op2.on_consumed = NULL; + } + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + handle_op_after_cancellation(exec_ctx, elem, &op2); + } else { + grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); + gpr_mu_unlock(&calld->mu_state); + } + break; + } + /* fall through */ case CALL_CREATED: - if (op->cancel_with_status != GRPC_STATUS_OK) - { - calld->state = CALL_CANCELLED; - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, elem, op); - } - else - { - calld->waiting_op = *op; - - if (op->send_ops == NULL) - { - /* need to have some send ops before we can select the - lb target */ - calld->state = CALL_WAITING_FOR_SEND; - gpr_mu_unlock (&calld->mu_state); - } - else - { - gpr_mu_lock (&chand->mu_config); - lb_policy = chand->lb_policy; - if (lb_policy) - { - grpc_transport_stream_op *op = &calld->waiting_op; - grpc_pollset *bind_pollset = op->bind_pollset; - grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata; - GRPC_LB_POLICY_REF (lb_policy, "pick"); - gpr_mu_unlock (&chand->mu_config); - calld->state = CALL_WAITING_FOR_PICK; - - GPR_ASSERT (op->bind_pollset); - GPR_ASSERT (op->send_ops); - GPR_ASSERT (op->send_ops->nops >= 1); - GPR_ASSERT (op->send_ops->ops[0].type == GRPC_OP_METADATA); - gpr_mu_unlock (&calld->mu_state); - - grpc_closure_init (&calld->async_setup_task, picked_target, calld); - grpc_lb_policy_pick (exec_ctx, lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task); - - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "pick"); - } - else if (chand->resolver != NULL) - { - calld->state = CALL_WAITING_FOR_CONFIG; - add_to_lb_policy_wait_queue_locked_state_config (elem); - if (!chand->started_resolving && chand->resolver != NULL) - { - GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - chand->started_resolving = 1; - grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); - } - gpr_mu_unlock (&chand->mu_config); - gpr_mu_unlock (&calld->mu_state); - } - else - { - calld->state = CALL_CANCELLED; - gpr_mu_unlock (&chand->mu_config); - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, elem, op); - } - } - } + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + } else { + calld->waiting_op = *op; + + if (op->send_ops == NULL) { + /* need to have some send ops before we can select the + lb target */ + calld->state = CALL_WAITING_FOR_SEND; + gpr_mu_unlock(&calld->mu_state); + } else { + gpr_mu_lock(&chand->mu_config); + lb_policy = chand->lb_policy; + if (lb_policy) { + grpc_transport_stream_op *op = &calld->waiting_op; + grpc_pollset *bind_pollset = op->bind_pollset; + grpc_metadata_batch *initial_metadata = + &op->send_ops->ops[0].data.metadata; + GRPC_LB_POLICY_REF(lb_policy, "pick"); + gpr_mu_unlock(&chand->mu_config); + calld->state = CALL_WAITING_FOR_PICK; + + GPR_ASSERT(op->bind_pollset); + GPR_ASSERT(op->send_ops); + GPR_ASSERT(op->send_ops->nops >= 1); + GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); + gpr_mu_unlock(&calld->mu_state); + + grpc_closure_init(&calld->async_setup_task, picked_target, calld); + grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset, + initial_metadata, &calld->picked_channel, + &calld->async_setup_task); + + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick"); + } else if (chand->resolver != NULL) { + calld->state = CALL_WAITING_FOR_CONFIG; + add_to_lb_policy_wait_queue_locked_state_config(elem); + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + } + gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&calld->mu_state); + } else { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + } + } + } break; - } + } } -static void -cc_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ - perform_transport_stream_op (exec_ctx, elem, op, 0); +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + perform_transport_stream_op(exec_ctx, elem, op, 0); } -static void watch_lb_policy (grpc_exec_ctx * exec_ctx, channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state); +static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state); -static void -on_lb_policy_state_changed_locked (grpc_exec_ctx * exec_ctx, lb_policy_connectivity_watcher * w) -{ +static void on_lb_policy_state_changed_locked( + grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) { /* check if the notification is for a stale policy */ - if (w->lb_policy != w->chand->lb_policy) - return; + if (w->lb_policy != w->chand->lb_policy) return; - grpc_connectivity_state_set (exec_ctx, &w->chand->state_tracker, w->state, "lb_changed"); - if (w->state != GRPC_CHANNEL_FATAL_FAILURE) - { - watch_lb_policy (exec_ctx, w->chand, w->lb_policy, w->state); - } + grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state, + "lb_changed"); + if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { + watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + } } -static void -on_lb_policy_state_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { lb_policy_connectivity_watcher *w = arg; - gpr_mu_lock (&w->chand->mu_config); - on_lb_policy_state_changed_locked (exec_ctx, w); - gpr_mu_unlock (&w->chand->mu_config); + gpr_mu_lock(&w->chand->mu_config); + on_lb_policy_state_changed_locked(exec_ctx, w); + gpr_mu_unlock(&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, w->chand->master, "watch_lb_policy"); - gpr_free (w); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy"); + gpr_free(w); } -static void -watch_lb_policy (grpc_exec_ctx * exec_ctx, channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state) -{ - lb_policy_connectivity_watcher *w = gpr_malloc (sizeof (*w)); - GRPC_CHANNEL_INTERNAL_REF (chand->master, "watch_lb_policy"); +static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state) { + lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); w->chand = chand; - grpc_closure_init (&w->on_changed, on_lb_policy_state_changed, w); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change (exec_ctx, lb_policy, &w->state, &w->on_changed); + grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, + &w->on_changed); } -static void -cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; @@ -534,313 +485,295 @@ cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; int exit_idle = 0; - if (chand->incoming_configuration != NULL) - { - lb_policy = grpc_client_config_get_lb_policy (chand->incoming_configuration); - if (lb_policy != NULL) - { - GRPC_LB_POLICY_REF (lb_policy, "channel"); - GRPC_LB_POLICY_REF (lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity (exec_ctx, lb_policy); - } - - grpc_client_config_unref (exec_ctx, chand->incoming_configuration); + if (chand->incoming_configuration != NULL) { + lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); + if (lb_policy != NULL) { + GRPC_LB_POLICY_REF(lb_policy, "channel"); + GRPC_LB_POLICY_REF(lb_policy, "config_change"); + state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy); } + grpc_client_config_unref(exec_ctx, chand->incoming_configuration); + } + chand->incoming_configuration = NULL; - gpr_mu_lock (&chand->mu_config); + gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; - if (lb_policy != NULL || chand->resolver == NULL /* disconnected */ ) - { - grpc_exec_ctx_enqueue_list (exec_ctx, &chand->waiting_for_config_closures); - } - if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) - { - GRPC_LB_POLICY_REF (lb_policy, "exit_idle"); - exit_idle = 1; - chand->exit_idle_when_lb_policy_arrives = 0; - } - - if (iomgr_success && chand->resolver) - { - grpc_resolver *resolver = chand->resolver; - GRPC_RESOLVER_REF (resolver, "channel-next"); - grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, state, "new_lb+resolver"); - if (lb_policy != NULL) - { - watch_lb_policy (exec_ctx, chand, lb_policy, state); - } - gpr_mu_unlock (&chand->mu_config); - GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); - GRPC_RESOLVER_UNREF (exec_ctx, resolver, "channel-next"); - } - else - { - old_resolver = chand->resolver; - chand->resolver = NULL; - grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); - gpr_mu_unlock (&chand->mu_config); - if (old_resolver != NULL) - { - grpc_resolver_shutdown (exec_ctx, old_resolver); - GRPC_RESOLVER_UNREF (exec_ctx, old_resolver, "channel"); - } - } - - if (exit_idle) - { - grpc_lb_policy_exit_idle (exec_ctx, lb_policy); - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "exit_idle"); - } - - if (old_lb_policy != NULL) - { - grpc_lb_policy_shutdown (exec_ctx, old_lb_policy); - GRPC_LB_POLICY_UNREF (exec_ctx, old_lb_policy, "channel"); - } - - if (lb_policy != NULL) - { - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "config_change"); - } - - GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, chand->master, "resolver"); + if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures); + } + if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); + exit_idle = 1; + chand->exit_idle_when_lb_policy_arrives = 0; + } + + if (iomgr_success && chand->resolver) { + grpc_resolver *resolver = chand->resolver; + GRPC_RESOLVER_REF(resolver, "channel-next"); + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, + "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy(exec_ctx, chand, lb_policy, state); + } + gpr_mu_unlock(&chand->mu_config); + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, + &chand->on_config_changed); + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); + } else { + old_resolver = chand->resolver; + chand->resolver = NULL; + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); + gpr_mu_unlock(&chand->mu_config); + if (old_resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, old_resolver); + GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel"); + } + } + + if (exit_idle) { + grpc_lb_policy_exit_idle(exec_ctx, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); + } + + if (old_lb_policy != NULL) { + grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); + } + + if (lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); + } + + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver"); } -static void -cc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op) -{ +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_exec_ctx_enqueue (exec_ctx, op->on_consumed, 1); - - GPR_ASSERT (op->set_accept_stream == NULL); - GPR_ASSERT (op->bind_pollset == NULL); - - gpr_mu_lock (&chand->mu_config); - if (op->on_connectivity_state_change != NULL) - { - grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); - op->on_connectivity_state_change = NULL; - op->connectivity_state = NULL; - } - - if (!is_empty (op, sizeof (*op))) - { - lb_policy = chand->lb_policy; - if (lb_policy) - { - GRPC_LB_POLICY_REF (lb_policy, "broadcast"); - } - } - - if (op->disconnect && chand->resolver != NULL) - { - grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); - destroy_resolver = chand->resolver; - chand->resolver = NULL; - if (chand->lb_policy != NULL) - { - grpc_lb_policy_shutdown (exec_ctx, chand->lb_policy); - GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel"); - chand->lb_policy = NULL; - } - } - gpr_mu_unlock (&chand->mu_config); - - if (destroy_resolver) - { - grpc_resolver_shutdown (exec_ctx, destroy_resolver); - GRPC_RESOLVER_UNREF (exec_ctx, destroy_resolver, "channel"); - } - - if (lb_policy) - { - grpc_lb_policy_broadcast (exec_ctx, lb_policy, op); - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "broadcast"); - } + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + + GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->bind_pollset == NULL); + + gpr_mu_lock(&chand->mu_config); + if (op->on_connectivity_state_change != NULL) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, op->connectivity_state, + op->on_connectivity_state_change); + op->on_connectivity_state_change = NULL; + op->connectivity_state = NULL; + } + + if (!is_empty(op, sizeof(*op))) { + lb_policy = chand->lb_policy; + if (lb_policy) { + GRPC_LB_POLICY_REF(lb_policy, "broadcast"); + } + } + + if (op->disconnect && chand->resolver != NULL) { + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + destroy_resolver = chand->resolver; + chand->resolver = NULL; + if (chand->lb_policy != NULL) { + grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + chand->lb_policy = NULL; + } + } + gpr_mu_unlock(&chand->mu_config); + + if (destroy_resolver) { + grpc_resolver_shutdown(exec_ctx, destroy_resolver); + GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); + } + + if (lb_policy) { + grpc_lb_policy_broadcast(exec_ctx, lb_policy, op); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast"); + } } /* Constructor for call_data */ -static void -init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op) -{ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; /* TODO(ctiller): is there something useful we can do here? */ - GPR_ASSERT (initial_op == NULL); + GPR_ASSERT(initial_op == NULL); - GPR_ASSERT (elem->filter == &grpc_client_channel_filter); - GPR_ASSERT (server_transport_data == NULL); - gpr_mu_init (&calld->mu_state); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GPR_ASSERT(server_transport_data == NULL); + gpr_mu_init(&calld->mu_state); calld->elem = elem; calld->state = CALL_CREATED; - calld->deadline = gpr_inf_future (GPR_CLOCK_REALTIME); + calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } /* Destructor for call_data */ -static void -destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call; /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ - gpr_mu_lock (&calld->mu_state); - switch (calld->state) - { + gpr_mu_lock(&calld->mu_state); + switch (calld->state) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; - gpr_mu_unlock (&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "client_channel"); + gpr_mu_unlock(&calld->mu_state); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel"); break; case CALL_CREATED: case CALL_CANCELLED: - gpr_mu_unlock (&calld->mu_state); + gpr_mu_unlock(&calld->mu_state); break; case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CONFIG: case CALL_WAITING_FOR_CALL: case CALL_WAITING_FOR_SEND: - gpr_log (GPR_ERROR, "should never reach here"); - abort (); + gpr_log(GPR_ERROR, "should never reach here"); + abort(); break; - } + } } /* Constructor for channel_data */ -static void -init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last) -{ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last) { channel_data *chand = elem->channel_data; - memset (chand, 0, sizeof (*chand)); + memset(chand, 0, sizeof(*chand)); - GPR_ASSERT (is_last); - GPR_ASSERT (elem->filter == &grpc_client_channel_filter); + GPR_ASSERT(is_last); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - gpr_mu_init (&chand->mu_config); + gpr_mu_init(&chand->mu_config); chand->mdctx = metadata_context; chand->master = master; - grpc_pollset_set_init (&chand->pollset_set); - grpc_closure_init (&chand->on_config_changed, cc_on_config_changed, chand); + grpc_pollset_set_init(&chand->pollset_set); + grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); - grpc_connectivity_state_init (&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, + "client_channel"); } /* Destructor for channel_data */ -static void -destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) -{ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - if (chand->resolver != NULL) - { - grpc_resolver_shutdown (exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF (exec_ctx, chand->resolver, "channel"); - } - if (chand->lb_policy != NULL) - { - GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel"); - } - grpc_connectivity_state_destroy (exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy (&chand->pollset_set); - gpr_mu_destroy (&chand->mu_config); + if (chand->resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + } + if (chand->lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + } + grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); + grpc_pollset_set_destroy(&chand->pollset_set); + gpr_mu_destroy(&chand->mu_config); } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, - cc_start_transport_op, - sizeof (call_data), - init_call_elem, - destroy_call_elem, - sizeof (channel_data), - init_channel_elem, - destroy_channel_elem, - cc_get_peer, - "client-channel", + cc_start_transport_stream_op, + cc_start_transport_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + cc_get_peer, + "client-channel", }; -void -grpc_client_channel_set_resolver (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, grpc_resolver * resolver) -{ +void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + grpc_resolver *resolver) { /* post construction initialization: set the transport setup pointer */ - grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack); + grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; - gpr_mu_lock (&chand->mu_config); - GPR_ASSERT (!chand->resolver); + gpr_mu_lock(&chand->mu_config); + GPR_ASSERT(!chand->resolver); chand->resolver = resolver; - GRPC_RESOLVER_REF (resolver, "channel"); - if (!grpc_closure_list_empty (chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) - { - chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); - } - gpr_mu_unlock (&chand->mu_config); + GRPC_RESOLVER_REF(resolver, "channel"); + if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || + chand->exit_idle_when_lb_policy_arrives) { + chand->started_resolving = 1; + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, + &chand->on_config_changed); + } + gpr_mu_unlock(&chand->mu_config); } -grpc_connectivity_state -grpc_client_channel_check_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, int try_to_connect) -{ +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; - gpr_mu_lock (&chand->mu_config); - out = grpc_connectivity_state_check (&chand->state_tracker); - if (out == GRPC_CHANNEL_IDLE && try_to_connect) - { - if (chand->lb_policy != NULL) - { - grpc_lb_policy_exit_idle (exec_ctx, chand->lb_policy); - } - else - { - chand->exit_idle_when_lb_policy_arrives = 1; - if (!chand->started_resolving && chand->resolver != NULL) - { - GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - chand->started_resolving = 1; - grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); - } - } - } - gpr_mu_unlock (&chand->mu_config); + gpr_mu_lock(&chand->mu_config); + out = grpc_connectivity_state_check(&chand->state_tracker); + if (out == GRPC_CHANNEL_IDLE && try_to_connect) { + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = 1; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + } + } + } + gpr_mu_unlock(&chand->mu_config); return out; } -void -grpc_client_channel_watch_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete) -{ +void grpc_client_channel_watch_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; - gpr_mu_lock (&chand->mu_config); - grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, state, on_complete); - gpr_mu_unlock (&chand->mu_config); + gpr_mu_lock(&chand->mu_config); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, state, on_complete); + gpr_mu_unlock(&chand->mu_config); } -grpc_pollset_set * -grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem) -{ +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( + grpc_channel_element *elem) { channel_data *chand = elem->channel_data; return &chand->pollset_set; } -void -grpc_client_channel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset) -{ +void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_pollset *pollset) { channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset (exec_ctx, &chand->pollset_set, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset); } -void -grpc_client_channel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset) -{ +void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_pollset *pollset) { channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset (exec_ctx, &chand->pollset_set, pollset); + grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset); } |