diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-21 13:34:08 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-21 13:34:08 -0700 |
commit | 965d4159cb9d057023f204b020a8b9802349822a (patch) | |
tree | 5892f658ac3422bb37c35db15c8edc253c484c6a /src/core/lib/surface | |
parent | f6343040bfa590881716e9820ab4779e931cd82f (diff) | |
parent | 88b8486f79d501c1ab8b72ac3092bd107656d6d6 (diff) |
Merge branch 'grpc_millis' into flowctl+millis
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/alarm.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 54 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 2 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 9 | ||||
-rw-r--r-- | src/core/lib/surface/channel.h | 2 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 49 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.cc | 2 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 16 |
8 files changed, 65 insertions, 72 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 7712f560b9..e3bcbb9d80 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -122,8 +122,7 @@ void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq, GPR_ASSERT(grpc_cq_begin_op(cq, tag)); grpc_timer_init(&exec_ctx, &alarm->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timespec_to_millis_round_up(deadline), &alarm->on_alarm); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 1390f63896..2871dea6ea 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -214,7 +214,7 @@ struct grpc_call { server, it's trailing metadata */ grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; int send_extra_metadata_count; - gpr_timespec send_deadline; + grpc_millis send_deadline; grpc_slice_buffer_stream sending_stream; @@ -281,7 +281,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error); -static void get_final_status(grpc_call *call, +static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details); @@ -370,11 +370,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } for (i = 0; i < 2; i++) { for (j = 0; j < 2; j++) { - call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE; } } - gpr_timespec send_deadline = - gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); + grpc_millis send_deadline = args->send_deadline; bool immediately_cancel = false; @@ -392,10 +391,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { - send_deadline = gpr_time_min( - gpr_convert_clock_type(send_deadline, - args->parent->send_deadline.clock_type), - args->parent->send_deadline); + send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ @@ -550,8 +546,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind"); } - get_final_status(c, set_status_value_directly, &c->final_info.final_status, - NULL); + get_final_status(exec_ctx, c, set_status_value_directly, + &c->final_info.final_status, NULL); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -737,13 +733,16 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, * FINAL STATUS CODE MANIPULATION */ -static bool get_final_status_from( - grpc_call *call, grpc_error *error, bool allow_ok_status, - void (*set_value)(grpc_status_code code, void *user_data), - void *set_value_user_data, grpc_slice *details) { +static bool get_final_status_from(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_error *error, bool allow_ok_status, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data, + grpc_slice *details) { grpc_status_code code; grpc_slice slice = grpc_empty_slice(); - grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL); + grpc_error_get_status(exec_ctx, error, call->send_deadline, &code, &slice, + NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } @@ -755,7 +754,7 @@ static bool get_final_status_from( return true; } -static void get_final_status(grpc_call *call, +static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { @@ -780,8 +779,9 @@ static void get_final_status(grpc_call *call, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set && grpc_error_has_clear_grpc_status(status[i].error)) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details)) { + if (get_final_status_from(exec_ctx, call, status[i].error, + allow_ok_status != 0, set_value, + set_value_user_data, details)) { return; } } @@ -789,8 +789,9 @@ static void get_final_status(grpc_call *call, /* If no clearly defined status exists, search for 'anything' */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details)) { + if (get_final_status_from(exec_ctx, call, status[i].error, + allow_ok_status != 0, set_value, + set_value_user_data, details)) { return; } } @@ -1331,11 +1332,11 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, } if (call->is_client) { - get_final_status(call, set_status_value_directly, + get_final_status(exec_ctx, call, set_status_value_directly, call->final_op.client.status, call->final_op.client.status_details); } else { - get_final_status(call, set_cancelled_value, + get_final_status(exec_ctx, call, set_cancelled_value, call->final_op.server.cancelled, NULL); } @@ -1617,11 +1618,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, validate_filtered_metadata(exec_ctx, bctl); GPR_TIMER_END("validate_filtered_metadata", 0); - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - call->send_deadline = - gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC); + if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) { + call->send_deadline = md->deadline; } } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index c680139cf6..27c2f5243c 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -49,7 +49,7 @@ typedef struct grpc_call_create_args { grpc_mdelem *add_initial_metadata; size_t add_initial_metadata_count; - gpr_timespec send_deadline; + grpc_millis send_deadline; } grpc_call_create_args; /* Create a new call based on \a args. diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 48962e5e45..0b5ca17cc4 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -265,7 +265,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative, grpc_mdelem path_mdelem, - grpc_mdelem authority_mdelem, gpr_timespec deadline) { + grpc_mdelem authority_mdelem, grpc_millis deadline) { grpc_mdelem send_metadata[2]; size_t num_metadata = 0; @@ -311,7 +311,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, host != NULL ? grpc_mdelem_from_slices(&exec_ctx, GRPC_MDSTR_AUTHORITY, grpc_slice_ref_internal(*host)) : GRPC_MDNULL, - deadline); + grpc_timespec_to_millis_round_up(deadline)); grpc_exec_ctx_finish(&exec_ctx); return call; } @@ -319,7 +319,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_call *grpc_channel_create_pollset_set_call( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method, - const grpc_slice *host, gpr_timespec deadline, void *reserved) { + const grpc_slice *host, grpc_millis deadline, void *reserved) { GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( exec_ctx, channel, parent_call, propagation_mask, NULL, pollset_set, @@ -375,7 +375,8 @@ grpc_call *grpc_channel_create_registered_call( grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call *call = grpc_channel_create_call_internal( &exec_ctx, channel, parent_call, propagation_mask, completion_queue, NULL, - GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), deadline); + GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), + grpc_timespec_to_millis_round_up(deadline)); grpc_exec_ctx_finish(&exec_ctx); return call; } diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 528bb868e2..827dd992b5 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -43,7 +43,7 @@ grpc_channel *grpc_channel_create_with_builder( grpc_call *grpc_channel_create_pollset_set_call( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method, - const grpc_slice *host, gpr_timespec deadline, void *reserved); + const grpc_slice *host, grpc_millis deadline, void *reserved); /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index fed66e3a20..a05ac02b89 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -58,8 +58,7 @@ typedef struct { grpc_error *(*kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker); grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, grpc_millis deadline); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); @@ -97,8 +96,7 @@ static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx, static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker, - gpr_timespec now, - gpr_timespec deadline) { + grpc_millis deadline) { non_polling_poller *npp = (non_polling_poller *)pollset; if (npp->shutdown) return GRPC_ERROR_NONE; non_polling_worker w; @@ -112,7 +110,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, w.next->prev = w.prev->next = &w; } w.kicked = false; - while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline)) + gpr_timespec deadline_ts = + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME); + while (!npp->shutdown && !w.kicked && + !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; if (&w == npp->root) { npp->root = w.next; @@ -757,7 +758,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, typedef struct { gpr_atm last_seen_things_queued_ever; grpc_completion_queue *cq; - gpr_timespec deadline; + grpc_millis deadline; grpc_cq_completion *stolen_completion; void *tag; /* for pluck */ bool first_loop; @@ -786,8 +787,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { return true; } } - return !a->first_loop && - gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); } #ifndef NDEBUG @@ -816,7 +816,6 @@ static void dump_pending_tags(grpc_completion_queue *cq) {} static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; - gpr_timespec now; cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -833,23 +832,21 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, dump_pending_tags(cq); - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cq, "next"); + grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cq, - .deadline = deadline, + .deadline = deadline_millis, .stolen_completion = NULL, .tag = NULL, .first_loop = true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); - for (;;) { - gpr_timespec iteration_deadline = deadline; + grpc_millis iteration_deadline = deadline_millis; if (is_finished_arg.stolen_completion != NULL) { grpc_cq_completion *c = is_finished_arg.stolen_completion; @@ -876,7 +873,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, attempt at popping. Not doing this can potentially deadlock this thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { - iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + iteration_deadline = 0; } } @@ -897,8 +894,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, break; } - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -909,7 +906,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, gpr_mu_lock(cq->mu); cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - NULL, now, iteration_deadline); + NULL, iteration_deadline); gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { @@ -1046,8 +1043,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { } gpr_mu_unlock(cq->mu); } - return !a->first_loop && - gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); } static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, @@ -1056,7 +1052,6 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; - gpr_timespec now; cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -1075,15 +1070,14 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cq, "pluck"); gpr_mu_lock(cq->mu); + grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cq, - .deadline = deadline, + .deadline = deadline_millis, .stolen_completion = NULL, .tag = tag, .first_loop = true}; @@ -1135,8 +1129,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1144,10 +1138,9 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - &worker, now, deadline); + &worker, deadline_millis); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 6286f9159d..88e26cbeb7 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -74,7 +74,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, mdb->list.head = &calld->status; mdb->list.tail = &calld->details; mdb->list.count = 2; - mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + mdb->deadline = GRPC_MILLIS_INF_FUTURE; } static void lame_start_transport_stream_op_batch( diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 1d0fd472d0..dd09cb91de 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -137,7 +137,7 @@ struct call_data { bool host_set; grpc_slice path; grpc_slice host; - gpr_timespec deadline; + grpc_millis deadline; grpc_completion_queue *cq_new; @@ -492,11 +492,13 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_ASSERT(calld->path_set); rc->data.batch.details->host = grpc_slice_ref_internal(calld->host); rc->data.batch.details->method = grpc_slice_ref_internal(calld->path); - rc->data.batch.details->deadline = calld->deadline; + rc->data.batch.details->deadline = + grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC); rc->data.batch.details->flags = calld->recv_initial_metadata_flags; break; case REGISTERED_CALL: - *rc->data.registered.deadline = calld->deadline; + *rc->data.registered.deadline = + grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC); if (rc->data.registered.optional_payload) { *rc->data.registered.optional_payload = calld->payload; calld->payload = NULL; @@ -739,7 +741,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)ptr; call_data *calld = (call_data *)elem->call_data; - gpr_timespec op_deadline; + grpc_millis op_deadline; if (error == GRPC_ERROR_NONE) { GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != NULL); @@ -759,7 +761,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_REF(error); } op_deadline = calld->recv_initial_metadata->deadline; - if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { + if (op_deadline != GRPC_MILLIS_INF_FUTURE) { calld->deadline = op_deadline; } if (calld->host_set && calld->path_set) { @@ -833,7 +835,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, memset(&args, 0, sizeof(args)); args.channel = chand->channel; args.server_transport_data = transport_server_data; - args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + args.send_deadline = GRPC_MILLIS_INF_FUTURE; grpc_call *call; grpc_error *error = grpc_call_create(exec_ctx, &args, &call); grpc_call_element *elem = @@ -881,7 +883,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; memset(calld, 0, sizeof(call_data)); - calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + calld->deadline = GRPC_MILLIS_INF_FUTURE; calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); |