diff options
author | yang-g <yangg@google.com> | 2017-01-04 13:11:18 -0800 |
---|---|---|
committer | yang-g <yangg@google.com> | 2017-01-04 13:11:18 -0800 |
commit | de3b8a8efb9dedddf8f728d60ee5a6833b6894ec (patch) | |
tree | d6a692f6f47a1aaa2372791d6925f7e35644c929 /src/core/lib/surface | |
parent | 8bd6a0542b7d3ad08497e14fce5470cb50c7bc87 (diff) | |
parent | 360f5d2abf8cb0f92aeeb3a1b201d2eb3bd13374 (diff) |
Merge branch 'master' into health
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 29 | ||||
-rw-r--r-- | src/core/lib/surface/channel_ping.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.c | 10 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 53 |
5 files changed, 56 insertions, 41 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 8ca3cab9d5..b20801005a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -794,7 +794,8 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { memset(&tc->op, 0, sizeof(tc->op)); tc->op.cancel_error = tc->error; /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc); + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); tc->op.on_complete = &tc->closure; execute_op(exec_ctx, tc->call, &tc->op); } @@ -804,7 +805,8 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { memset(&tc->op, 0, sizeof(tc->op)); tc->op.close_error = tc->error; /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc); + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); tc->op.on_complete = &tc->closure; execute_op(exec_ctx, tc->call, &tc->op); } @@ -814,13 +816,13 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error); if (tc->type == TC_CANCEL) { - grpc_closure_init(&tc->closure, send_cancel, tc); + grpc_closure_init(&tc->closure, send_cancel, tc, grpc_schedule_on_exec_ctx); GRPC_CALL_INTERNAL_REF(tc->call, "cancel"); } else if (tc->type == TC_CLOSE) { - grpc_closure_init(&tc->closure, send_close, tc); + grpc_closure_init(&tc->closure, send_close, tc, grpc_schedule_on_exec_ctx); GRPC_CALL_INTERNAL_REF(tc->call, "close"); } - grpc_exec_ctx_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); return GRPC_CALL_OK; } @@ -1138,8 +1140,8 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, } else { *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, - bctl); + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl, + grpc_schedule_on_exec_ctx); continue_receiving_slices(exec_ctx, bctl); } } @@ -1251,9 +1253,10 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, call->has_initial_md_been_received = true; if (call->saved_receiving_stream_ready_bctlp != NULL) { grpc_closure *saved_rsr_closure = grpc_closure_create( - receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); + receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, + grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL); + grpc_closure_sched(exec_ctx, saved_rsr_closure, error); } gpr_mu_unlock(&call->mu); @@ -1558,7 +1561,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, - receiving_initial_metadata_ready, bctl); + receiving_initial_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); bctl->recv_initial_metadata = 1; stream_op->recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1581,7 +1585,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->receiving_buffer = op->data.recv_message; stream_op->recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, - bctl); + bctl, grpc_schedule_on_exec_ctx); stream_op->recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; @@ -1646,7 +1650,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); stream_op->context = call->context; - grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); + grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; gpr_mu_unlock(&call->mu); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 0d2f01a649..e68febdddf 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -71,7 +71,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GPR_ASSERT(reserved == NULL); pr->tag = tag; pr->cq = cq; - grpc_closure_init(&pr->closure, ping_done, pr); + grpc_closure_init(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); grpc_cq_begin_op(cq, tag); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 184c1a1a16..4613c9021e 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -168,7 +168,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { #ifndef NDEBUG cc->outstanding_tag_count = 0; #endif - grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc); + grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, + grpc_schedule_on_exec_ctx); GPR_TIMER_END("grpc_completion_queue_create", 0); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 57da94ac1e..f1ad13711a 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -98,16 +98,16 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN); *op->connectivity_state = GRPC_CHANNEL_SHUTDOWN; - grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, op->on_connectivity_state_change, + GRPC_ERROR_NONE); } if (op->send_ping != NULL) { - grpc_exec_ctx_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("lame client channel"), NULL); + grpc_closure_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("lame client channel")); } GRPC_ERROR_UNREF(op->disconnect_with_error); if (op->on_consumed != NULL) { - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); } } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 62d7afc8da..78699e9e65 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -278,7 +278,8 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, int send_goaway, grpc_error *send_disconnect) { struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); - grpc_closure_init(&sc->closure, shutdown_cleanup, sc); + grpc_closure_init(&sc->closure, shutdown_cleanup, sc, + grpc_schedule_on_exec_ctx); grpc_transport_op *op = grpc_make_transport_op(&sc->closure); grpc_channel_element *elem; @@ -346,9 +347,9 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); } } @@ -440,8 +441,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, orphan_channel(chand); server_ref(chand->server); maybe_finish_shutdown(exec_ctx, chand->server); - chand->finish_destroy_channel_closure.cb = finish_destroy_channel; - chand->finish_destroy_channel_closure.cb_arg = chand; + grpc_closure_init(&chand->finish_destroy_channel_closure, + finish_destroy_channel, chand, grpc_schedule_on_exec_ctx); if (grpc_server_channel_trace && error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(error); @@ -545,8 +546,9 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error); return; } @@ -590,9 +592,9 @@ static void finish_start_new_rpc( gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); return; } @@ -607,7 +609,8 @@ static void finish_start_new_rpc( memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; op.data.recv_message = &calld->payload; - grpc_closure_init(&calld->publish, publish_new_rpc, elem); + grpc_closure_init(&calld->publish, publish_new_rpc, elem, + grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, &calld->publish); break; @@ -813,9 +816,10 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -851,7 +855,8 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_INITIAL_METADATA; op.data.recv_initial_metadata = &calld->initial_metadata; - grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem); + grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem, + grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, &calld->got_initial_metadata); } @@ -887,7 +892,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, gpr_mu_init(&calld->mu_state); grpc_closure_init(&calld->server_on_recv_initial_metadata, - server_on_recv_initial_metadata, elem); + server_on_recv_initial_metadata, elem, + grpc_schedule_on_exec_ctx); server_ref(chand->server); return GRPC_ERROR_NONE; @@ -926,7 +932,8 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; grpc_closure_init(&chand->channel_connectivity_changed, - channel_connectivity_changed, chand); + channel_connectivity_changed, chand, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -1278,7 +1285,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { - grpc_closure_init(&l->destroy_done, listener_destroy_done, server); + grpc_closure_init(&l->destroy_done, listener_destroy_done, server, + grpc_schedule_on_exec_ctx); l->destroy(&exec_ctx, server, l->arg, &l->destroy_done); } @@ -1384,9 +1392,10 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; |