aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/alarm.c3
-rw-r--r--src/core/lib/surface/call.c54
-rw-r--r--src/core/lib/surface/call.h2
-rw-r--r--src/core/lib/surface/channel.c9
-rw-r--r--src/core/lib/surface/channel.h2
-rw-r--r--src/core/lib/surface/completion_queue.c49
-rw-r--r--src/core/lib/surface/lame_client.cc2
-rw-r--r--src/core/lib/surface/server.c16
8 files changed, 65 insertions, 72 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index 7d60b1de17..4a81d55e6e 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -114,8 +114,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_timespec_to_millis_round_up(deadline), &alarm->on_alarm);
grpc_exec_ctx_finish(&exec_ctx);
return alarm;
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 3aa20ffcd7..4537bf3463 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -214,7 +214,7 @@ struct grpc_call {
server, it's trailing metadata */
grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
int send_extra_metadata_count;
- gpr_timespec send_deadline;
+ grpc_millis send_deadline;
grpc_slice_buffer_stream sending_stream;
@@ -281,7 +281,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
grpc_error *error);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error);
-static void get_final_status(grpc_call *call,
+static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call,
void (*set_value)(grpc_status_code code,
void *user_data),
void *set_value_user_data, grpc_slice *details);
@@ -369,11 +369,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
}
for (i = 0; i < 2; i++) {
for (j = 0; j < 2; j++) {
- call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
}
}
- gpr_timespec send_deadline =
- gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
+ grpc_millis send_deadline = args->send_deadline;
bool immediately_cancel = false;
@@ -391,10 +390,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&pc->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
- send_deadline = gpr_time_min(
- gpr_convert_clock_type(send_deadline,
- args->parent_call->send_deadline.clock_type),
- args->parent_call->send_deadline);
+ send_deadline = GPR_MIN(send_deadline, args->parent_call->send_deadline);
}
/* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */
@@ -549,8 +545,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind");
}
- get_final_status(call, set_status_value_directly, &c->final_info.final_status,
- NULL);
+ get_final_status(exec_ctx, call, set_status_value_directly,
+ &c->final_info.final_status, NULL);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
@@ -736,13 +732,16 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
* FINAL STATUS CODE MANIPULATION
*/
-static bool get_final_status_from(
- grpc_call *call, grpc_error *error, bool allow_ok_status,
- void (*set_value)(grpc_status_code code, void *user_data),
- void *set_value_user_data, grpc_slice *details) {
+static bool get_final_status_from(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_error *error, bool allow_ok_status,
+ void (*set_value)(grpc_status_code code,
+ void *user_data),
+ void *set_value_user_data,
+ grpc_slice *details) {
grpc_status_code code;
grpc_slice slice = grpc_empty_slice();
- grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL);
+ grpc_error_get_status(exec_ctx, error, call->send_deadline, &code, &slice,
+ NULL);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
@@ -754,7 +753,7 @@ static bool get_final_status_from(
return true;
}
-static void get_final_status(grpc_call *call,
+static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call,
void (*set_value)(grpc_status_code code,
void *user_data),
void *set_value_user_data, grpc_slice *details) {
@@ -779,8 +778,9 @@ static void get_final_status(grpc_call *call,
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set &&
grpc_error_has_clear_grpc_status(status[i].error)) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details)) {
+ if (get_final_status_from(exec_ctx, call, status[i].error,
+ allow_ok_status != 0, set_value,
+ set_value_user_data, details)) {
return;
}
}
@@ -788,8 +788,9 @@ static void get_final_status(grpc_call *call,
/* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details)) {
+ if (get_final_status_from(exec_ctx, call, status[i].error,
+ allow_ok_status != 0, set_value,
+ set_value_user_data, details)) {
return;
}
}
@@ -1329,11 +1330,11 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
}
if (call->is_client) {
- get_final_status(call, set_status_value_directly,
+ get_final_status(exec_ctx, call, set_status_value_directly,
call->final_op.client.status,
call->final_op.client.status_details);
} else {
- get_final_status(call, set_cancelled_value,
+ get_final_status(exec_ctx, call, set_cancelled_value,
call->final_op.server.cancelled, NULL);
}
@@ -1609,11 +1610,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
validate_filtered_metadata(exec_ctx, bctl);
GPR_TIMER_END("validate_filtered_metadata", 0);
- if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
- 0 &&
- !call->is_client) {
- call->send_deadline =
- gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
+ if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
+ call->send_deadline = md->deadline;
}
}
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index d537637cbb..be362c8000 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -49,7 +49,7 @@ typedef struct grpc_call_create_args {
grpc_mdelem *add_initial_metadata;
size_t add_initial_metadata_count;
- gpr_timespec send_deadline;
+ grpc_millis send_deadline;
} grpc_call_create_args;
/* Create a new call based on \a args.
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 850fbe6a69..2cd9e9463b 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -259,7 +259,7 @@ static grpc_call *grpc_channel_create_call_internal(
grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask, grpc_completion_queue *cq,
grpc_pollset_set *pollset_set_alternative, grpc_mdelem path_mdelem,
- grpc_mdelem authority_mdelem, gpr_timespec deadline) {
+ grpc_mdelem authority_mdelem, grpc_millis deadline) {
grpc_mdelem send_metadata[2];
size_t num_metadata = 0;
@@ -305,7 +305,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
host != NULL ? grpc_mdelem_from_slices(&exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_ref_internal(*host))
: GRPC_MDNULL,
- deadline);
+ grpc_timespec_to_millis_round_up(deadline));
grpc_exec_ctx_finish(&exec_ctx);
return call;
}
@@ -313,7 +313,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *grpc_channel_create_pollset_set_call(
grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method,
- const grpc_slice *host, gpr_timespec deadline, void *reserved) {
+ const grpc_slice *host, grpc_millis deadline, void *reserved) {
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
exec_ctx, channel, parent_call, propagation_mask, NULL, pollset_set,
@@ -369,7 +369,8 @@ grpc_call *grpc_channel_create_registered_call(
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_call *call = grpc_channel_create_call_internal(
&exec_ctx, channel, parent_call, propagation_mask, completion_queue, NULL,
- GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), deadline);
+ GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority),
+ grpc_timespec_to_millis_round_up(deadline));
grpc_exec_ctx_finish(&exec_ctx);
return call;
}
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 528bb868e2..827dd992b5 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -43,7 +43,7 @@ grpc_channel *grpc_channel_create_with_builder(
grpc_call *grpc_channel_create_pollset_set_call(
grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method,
- const grpc_slice *host, gpr_timespec deadline, void *reserved);
+ const grpc_slice *host, grpc_millis deadline, void *reserved);
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 10e4e5ab0c..f72eeb5a01 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -57,8 +57,7 @@ typedef struct {
grpc_error *(*kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline);
+ grpc_pollset_worker **worker, grpc_millis deadline);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
@@ -96,8 +95,7 @@ static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx,
static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_pollset_worker **worker,
- gpr_timespec now,
- gpr_timespec deadline) {
+ grpc_millis deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
@@ -111,7 +109,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
w.next->prev = w.prev->next = &w;
}
w.kicked = false;
- while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
+ gpr_timespec deadline_ts =
+ grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME);
+ while (!npp->shutdown && !w.kicked &&
+ !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;
if (&w == npp->root) {
npp->root = w.next;
@@ -759,7 +760,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
typedef struct {
gpr_atm last_seen_things_queued_ever;
grpc_completion_queue *cq;
- gpr_timespec deadline;
+ grpc_millis deadline;
grpc_cq_completion *stolen_completion;
void *tag; /* for pluck */
bool first_loop;
@@ -788,8 +789,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
return true;
}
}
- return !a->first_loop &&
- gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
#ifndef NDEBUG
@@ -818,7 +818,6 @@ static void dump_pending_tags(grpc_completion_queue *cq) {}
static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) {
grpc_event ret;
- gpr_timespec now;
cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -835,23 +834,21 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
dump_pending_tags(cq);
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
-
GRPC_CQ_INTERNAL_REF(cq, "next");
+ grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
.cq = cq,
- .deadline = deadline,
+ .deadline = deadline_millis,
.stolen_completion = NULL,
.tag = NULL,
.first_loop = true};
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
-
for (;;) {
- gpr_timespec iteration_deadline = deadline;
+ grpc_millis iteration_deadline = deadline_millis;
if (is_finished_arg.stolen_completion != NULL) {
grpc_cq_completion *c = is_finished_arg.stolen_completion;
@@ -878,7 +875,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
attempt at popping. Not doing this can potentially deadlock this
thread forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
- iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
+ iteration_deadline = 0;
}
}
@@ -899,8 +896,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
break;
}
- now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop &&
+ grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cq);
@@ -911,7 +908,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
gpr_mu_lock(cq->mu);
cq->num_polls++;
grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- NULL, now, iteration_deadline);
+ NULL, iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
@@ -1045,8 +1042,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
gpr_mu_unlock(cq->mu);
}
- return !a->first_loop &&
- gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
@@ -1055,7 +1051,6 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
- gpr_timespec now;
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -1074,15 +1069,14 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
-
GRPC_CQ_INTERNAL_REF(cq, "pluck");
gpr_mu_lock(cq->mu);
+ grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
.cq = cq,
- .deadline = deadline,
+ .deadline = deadline_millis,
.stolen_completion = NULL,
.tag = tag,
.first_loop = true};
@@ -1134,8 +1128,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
break;
}
- now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop &&
+ grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
@@ -1143,10 +1137,9 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
break;
}
-
cq->num_polls++;
grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- &worker, now, deadline);
+ &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index 6286f9159d..88e26cbeb7 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -74,7 +74,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
mdb->list.head = &calld->status;
mdb->list.tail = &calld->details;
mdb->list.count = 2;
- mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ mdb->deadline = GRPC_MILLIS_INF_FUTURE;
}
static void lame_start_transport_stream_op_batch(
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 8582d826ca..84cd1e4bfd 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -136,7 +136,7 @@ struct call_data {
bool host_set;
grpc_slice path;
grpc_slice host;
- gpr_timespec deadline;
+ grpc_millis deadline;
grpc_completion_queue *cq_new;
@@ -489,11 +489,13 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
GPR_ASSERT(calld->path_set);
rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
- rc->data.batch.details->deadline = calld->deadline;
+ rc->data.batch.details->deadline =
+ grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_REALTIME);
rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
break;
case REGISTERED_CALL:
- *rc->data.registered.deadline = calld->deadline;
+ *rc->data.registered.deadline =
+ grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_REALTIME);
if (rc->data.registered.optional_payload) {
*rc->data.registered.optional_payload = calld->payload;
calld->payload = NULL;
@@ -734,7 +736,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
grpc_error *error) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- gpr_timespec op_deadline;
+ grpc_millis op_deadline;
if (error == GRPC_ERROR_NONE) {
GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != NULL);
@@ -754,7 +756,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
GRPC_ERROR_REF(error);
}
op_deadline = calld->recv_initial_metadata->deadline;
- if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
+ if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
calld->deadline = op_deadline;
}
if (calld->host_set && calld->path_set) {
@@ -828,7 +830,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
memset(&args, 0, sizeof(args));
args.channel = chand->channel;
args.server_transport_data = transport_server_data;
- args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ args.send_deadline = GRPC_MILLIS_INF_FUTURE;
grpc_call *call;
grpc_error *error = grpc_call_create(exec_ctx, &args, &call);
grpc_call_element *elem =
@@ -876,7 +878,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
- calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ calld->deadline = GRPC_MILLIS_INF_FUTURE;
calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);