diff options
author | Craig Tiller <ctiller@google.com> | 2017-04-04 08:29:43 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-04-04 08:29:43 -0700 |
commit | 61666f5557b343fb77d2a250dcdbede253a54d88 (patch) | |
tree | 817040b0fef4c1a9b44259d00a9e5cc6166c4ff2 /src | |
parent | 6696894e6a3e02170a560e0abcea1b4e948d583f (diff) | |
parent | 0df90f049ad8ee0da6e47bbe89650fcf9bee25e2 (diff) |
Merge branch 'lazy-batches' into lazy-parent
Diffstat (limited to 'src')
87 files changed, 1102 insertions, 452 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index 8c3d450dc1..bcf59a4efe 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -74,7 +74,7 @@ static void extract_and_annotate_method_tag(grpc_metadata_batch *md, } static void client_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (op->send_initial_metadata) { @@ -85,7 +85,7 @@ static void client_mutate_op(grpc_call_element *elem, static void client_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { client_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); } @@ -104,7 +104,7 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, } static void server_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; if (op->recv_initial_metadata) { /* substitute our callback for the op callback */ @@ -119,7 +119,7 @@ static void server_mutate_op(grpc_call_element *elem, static void server_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { /* TODO(ctiller): this code fails. I don't know why. I expect it's incomplete, and someone should look at it soon. diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 6e3b472527..477d0e792b 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -374,8 +374,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // resolver actually specified. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != NULL) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) { grpc_lb_addresses *addresses = channel_arg->value.pointer.p; bool found_backend_address = false; for (size_t i = 0; i < addresses->num_addresses; ++i) { @@ -643,14 +642,26 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, // Record client channel factory. const grpc_arg *arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); - GPR_ASSERT(arg != NULL); - GPR_ASSERT(arg->type == GRPC_ARG_POINTER); + if (arg == NULL) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Missing client channel factory in args for client channel filter"); + } + if (arg->type != GRPC_ARG_POINTER) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "client channel factory arg must be a pointer"); + } grpc_client_channel_factory_ref(arg->value.pointer.p); chand->client_channel_factory = arg->value.pointer.p; // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(arg != NULL); - GPR_ASSERT(arg->type == GRPC_ARG_STRING); + if (arg == NULL) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Missing server uri in args for client channel filter"); + } + if (arg->type != GRPC_ARG_STRING) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "server uri arg must be a string"); + } char *proxy_name = NULL; grpc_channel_args *new_args = NULL; grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args, @@ -755,7 +766,7 @@ typedef struct client_channel_call_data { grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; - grpc_transport_stream_op **waiting_ops; + grpc_transport_stream_op_batch **waiting_ops; size_t waiting_ops_count; size_t waiting_ops_capacity; @@ -775,7 +786,8 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call( return scc == CANCELLED_CALL ? NULL : scc; } -static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { +static void add_waiting_locked(call_data *calld, + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("add_waiting_locked", 0); if (calld->waiting_ops_count == calld->waiting_ops_capacity) { calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity); @@ -791,7 +803,7 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, grpc_error *error) { size_t i; for (i = 0; i < calld->waiting_ops_count; i++) { - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error)); } calld->waiting_ops_count = 0; @@ -804,7 +816,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { } grpc_subchannel_call *call = GET_CALL(calld); - grpc_transport_stream_op **ops = calld->waiting_ops; + grpc_transport_stream_op_batch **ops = calld->waiting_ops; size_t nops = calld->waiting_ops_count; if (call == CANCELLED_CALL) { fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); @@ -1052,9 +1064,9 @@ static bool pick_subchannel_locked( return false; } -static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_call_element *elem) { +static void start_transport_stream_op_batch_locked_inner( + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_subchannel_call *call; @@ -1062,7 +1074,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, /* need to recheck that another thread hasn't set the call */ call = GET_CALL(calld); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); /* early out */ return; @@ -1077,7 +1089,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, (gpr_atm)(uintptr_t)CANCELLED_CALL)) { /* recurse to retry */ - start_transport_stream_op_locked_inner(exec_ctx, op, elem); + start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); /* early out */ return; } else { @@ -1099,7 +1111,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); break; } - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); /* early out */ @@ -1143,13 +1155,13 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); } gpr_atm_rel_store(&calld->subchannel_call, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); /* recurse to retry */ - start_transport_stream_op_locked_inner(exec_ctx, op, elem); + start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); /* early out */ return; } @@ -1177,11 +1189,12 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GRPC_ERROR_REF(error)); } -static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); +static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); - grpc_transport_stream_op *op = arg; + grpc_transport_stream_op_batch *op = arg; grpc_call_element *elem = op->handler_private.extra_arg; call_data *calld = elem->call_data; @@ -1193,11 +1206,11 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, op->on_complete = &calld->on_complete; } - start_transport_stream_op_locked_inner(exec_ctx, op, elem); + start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, - "start_transport_stream_op"); - GPR_TIMER_END("start_transport_stream_op_locked", 0); + "start_transport_stream_op_batch"); + GPR_TIMER_END("start_transport_stream_op_batch_locked", 0); } /* The logic here is fairly complicated, due to (a) the fact that we @@ -1208,39 +1221,40 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, We use double-checked locking to initially see if initialization has been performed. If it has not, we acquire the combiner and perform initialization. If it has, we proceed on the fast path. */ -static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void cc_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); + grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, + op); /* try to (atomically) get the call */ grpc_subchannel_call *call = GET_CALL(calld); - GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); + GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; } if (call != NULL) { grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; } /* we failed; lock and figure out what to do */ - GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op"); + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); op->handler_private.extra_arg = elem; grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, - start_transport_stream_op_locked, op, + start_transport_stream_op_batch_locked, op, grpc_combiner_scheduler(chand->combiner, false)), GRPC_ERROR_NONE); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -1299,7 +1313,7 @@ static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, */ const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, + cc_start_transport_stream_op_batch, cc_start_transport_op, sizeof(call_data), cc_init_call_elem, diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 063c0badff..3a2cb00ca7 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -360,7 +360,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, for (size_t i = 0; i < c->args->num_args; i++) { if (0 == strcmp(c->args->args[i].key, "grpc.testing.fixed_reconnect_backoff_ms")) { - GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); fixed_reconnect_backoff = true; initial_backoff_ms = min_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer( @@ -749,11 +748,11 @@ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0); grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op); + top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, op); GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 3e64a2507c..ba96c92df8 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -157,7 +157,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); /** continue querying for peer */ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 601b0e643b..629d81953c 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -847,7 +847,9 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * this is the right LB policy to use. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_grpclb_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index fc65dfdcb9..8c73ea6014 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -402,7 +402,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, * addresses, since we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index a62082a2ff..8665b5c6c8 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -691,7 +691,9 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, * addresses, since we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index cb6bc95dd3..2e7b3580c3 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -183,10 +183,10 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, */ } -static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - GPR_TIMER_BEGIN("lr_start_transport_stream_op", 0); +static void lr_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + GPR_TIMER_BEGIN("lr_start_transport_stream_op_batch", 0); call_data *calld = elem->call_data; if (op->recv_initial_metadata) { @@ -200,11 +200,11 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("lr_start_transport_stream_op", 0); + GPR_TIMER_END("lr_start_transport_stream_op_batch", 0); } const grpc_channel_filter grpc_load_reporting_filter = { - lr_start_transport_stream_op, + lr_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 9979cf8eaa..a8e320d037 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1102,7 +1102,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, &s->fetching_slice, UINT32_MAX, - &s->complete_fetch)) { + &s->complete_fetch_locked)) { add_fetched_slice_locked(exec_ctx, t, s); } } @@ -1140,13 +1140,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); - grpc_transport_stream_op *op = stream_op; + grpc_transport_stream_op_batch *op = stream_op; grpc_chttp2_stream *s = op->handler_private.extra_arg; - grpc_transport_stream_op_payload *op_payload = op->payload; + grpc_transport_stream_op_batch_payload *op_payload = op->payload; grpc_chttp2_transport *t = s->t; if (grpc_http_trace) { - char *str = grpc_transport_stream_op_string(op); + char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); gpr_free(str); @@ -1321,7 +1321,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_trailing_metadata_finished, grpc_metadata_batch_is_empty( - op->payload->send_initial_metadata.send_initial_metadata) + op->payload->send_trailing_metadata.send_trailing_metadata) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Attempt to send trailing metadata after " @@ -1374,13 +1374,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_stream *gs, + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("perform_stream_op", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; if (grpc_http_trace) { - char *str = grpc_transport_stream_op_string(op); + char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op[s=%p/%d]: %s", s, s->id, str); gpr_free(str); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8b718e963c..50993e4ecd 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -452,7 +452,6 @@ struct grpc_chttp2_stream { int64_t next_message_end_offset; int64_t flow_controlled_bytes_written; bool complete_fetch_covered_by_poller; - grpc_closure complete_fetch; grpc_closure complete_fetch_locked; grpc_closure *fetching_send_message_finished; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index fa6129f835..9bd8914b98 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -172,7 +172,7 @@ struct op_state { }; struct op_and_state { - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; struct op_state state; bool done; struct stream_obj *s; /* Pointer back to the stream object */ @@ -187,7 +187,7 @@ struct op_storage { struct stream_obj { gpr_arena *arena; struct op_and_state *oas; - grpc_transport_stream_op *curr_op; + grpc_transport_stream_op_batch *curr_op; grpc_cronet_transport *curr_ct; grpc_stream *curr_gs; bidirectional_stream *cbs; @@ -298,12 +298,13 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) { /* Add a new stream op to op storage. */ -static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { +static void add_to_storage(struct stream_obj *s, + grpc_transport_stream_op_batch *op) { struct op_storage *storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state)); - memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op)); + memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch)); memset(&new_op->state, 0, sizeof(new_op->state)); new_op->s = s; new_op->done = false; @@ -768,7 +769,7 @@ static bool header_has_authority(grpc_linked_mdelem *head) { Op Execution: Decide if one of the actions contained in the stream op can be executed. This is the heart of the state machine. */ -static bool op_can_be_run(grpc_transport_stream_op *curr_op, +static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, struct stream_obj *s, struct op_state *op_state, enum e_op_id op_id) { struct op_state *stream_state = &s->state; @@ -919,7 +920,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, */ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas) { - grpc_transport_stream_op *stream_op = &oas->op; + grpc_transport_stream_op_batch *stream_op = &oas->op; struct stream_obj *s = oas->s; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; struct op_state *stream_state = &s->state; @@ -1037,18 +1038,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_callback_received[OP_FAILED]) { -<<<<<<< HEAD grpc_closure_sched( exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); -======= - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); ->>>>>>> 739cecb0bc1f1ba3b2e0b390795cbaf429ec81c2 + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, @@ -1304,7 +1302,8 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set) {} static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_stream *gs, + grpc_transport_stream_op_batch *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); if (op->send_initial_metadata && header_has_authority(op->payload->send_initial_metadata diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 479529d489..94382980eb 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -246,9 +246,9 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, } void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { grpc_call_element *next_elem = elem + 1; - next_elem->filter->start_transport_stream_op(exec_ctx, next_elem, op); + next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op); } char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, @@ -284,8 +284,8 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_error *error) { - grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL); + grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; - elem->filter->start_transport_stream_op(exec_ctx, elem, op); + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); } diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 80e3603e8d..fdbcbdb018 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -112,9 +112,9 @@ typedef struct { typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op); + void (*start_transport_stream_op_batch)(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ @@ -281,7 +281,7 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set( grpc_polling_entity *pollent); /* Call the next operation in a call stack */ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); /* Call the next operation (depending on call directionality) in a channel stack */ void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -300,7 +300,8 @@ grpc_channel_stack *grpc_channel_stack_from_top_element( grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_transport_stream_op *op); + grpc_call_element *elem, + grpc_transport_stream_op_batch *op); void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem, diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 0f4c2e9aee..4625cba0d2 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -62,7 +62,7 @@ typedef struct call_data { /** If true, contents of \a compression_algorithm are authoritative */ int has_compression_algorithm; - grpc_transport_stream_op *send_op; + grpc_transport_stream_op_batch *send_op; uint32_t send_length; uint32_t send_flags; grpc_slice incoming_slice; @@ -243,19 +243,19 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, } } -static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void compress_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; - GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0); + GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); if (op->send_initial_metadata) { grpc_error *error = process_send_initial_metadata( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; } } @@ -270,7 +270,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_next_op(exec_ctx, elem, op); } - GPR_TIMER_END("compress_start_transport_stream_op", 0); + GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -339,7 +339,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_compress_filter = { - compress_start_transport_stream_op, + compress_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 75c68a5534..22caf24373 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -62,9 +62,9 @@ typedef struct connected_channel_call_data { void *unused; } call_data; /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void con_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -142,7 +142,7 @@ static void con_get_channel_info(grpc_exec_ctx *exec_ctx, const grpc_channel_info *channel_info) {} const grpc_channel_filter grpc_connected_filter = { - con_start_transport_stream_op, + con_start_transport_stream_op_batch, con_start_transport_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 939ed21677..fda099b021 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -134,7 +134,7 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { // Inject our own on_complete callback into op. static void inject_on_complete_cb(grpc_deadline_state* deadline_state, - grpc_transport_stream_op* op) { + grpc_transport_stream_op_batch* op) { deadline_state->next_on_complete = op->on_complete; grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state, grpc_schedule_on_exec_ctx); @@ -196,9 +196,9 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, start_timer_if_needed(exec_ctx, elem, new_deadline); } -void grpc_deadline_state_client_start_transport_stream_op( +void grpc_deadline_state_client_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op* op) { + grpc_transport_stream_op_batch* op) { grpc_deadline_state* deadline_state = elem->call_data; if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, deadline_state); @@ -261,10 +261,11 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, } // Method for starting a call op for client filter. -static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_transport_stream_op* op) { - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); +static void client_start_transport_stream_op_batch( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { + grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, + op); // Chain to next filter. grpc_call_next_op(exec_ctx, elem, op); } @@ -282,9 +283,9 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, } // Method for starting a call op for server filter. -static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_transport_stream_op* op) { +static void server_start_transport_stream_op_batch( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { server_call_data* calld = elem->call_data; if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); @@ -317,7 +318,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, } const grpc_channel_filter grpc_client_deadline_filter = { - client_start_transport_stream_op, + client_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(base_call_data), init_call_elem, @@ -332,7 +333,7 @@ const grpc_channel_filter grpc_client_deadline_filter = { }; const grpc_channel_filter grpc_server_deadline_filter = { - server_start_transport_stream_op, + server_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(server_call_data), init_call_elem, diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 72cd5cb929..d8db9a9f97 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -83,15 +83,15 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline); -// To be called from the client-side filter's start_transport_stream_op() +// To be called from the client-side filter's start_transport_stream_op_batch() // method. Ensures that the deadline timer is cancelled when the call // is completed. // // Note: It is the caller's responsibility to chain to the next filter if // necessary after this function returns. -void grpc_deadline_state_client_start_transport_stream_op( +void grpc_deadline_state_client_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op* op); + grpc_transport_stream_op_batch* op); // Deadline filters for direct client channels and server channels. // Note: Deadlines for non-direct client channels are handled by the diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index e43b97335c..4e47c5c658 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -36,7 +36,7 @@ #include <grpc/support/string_util.h> #include <string.h> #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/security/util/b64.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -63,7 +63,7 @@ typedef struct call_data { uint8_t *payload_bytes; /* Vars to read data off of send_message */ - grpc_transport_stream_op *send_op; + grpc_transport_stream_op_batch *send_op; uint32_t send_length; uint32_t send_flags; grpc_slice incoming_slice; @@ -254,7 +254,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -348,7 +348,7 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, calld->on_complete = op->on_complete; op->on_complete = &calld->hc_on_complete; - op->send_message = NULL; + op->send_message = false; grpc_slice_unref_internal(exec_ctx, path_with_query_slice); } else { /* Not all data is available. Fall back to POST. */ @@ -422,12 +422,12 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("hc_start_transport_op", 0); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); grpc_error *error = hc_mutate_op(exec_ctx, elem, op); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); } else { call_data *calld = elem->call_data; if (op->send_message && calld->send_message_blocked) { diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index 4217d93645..c1e49ffacc 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -37,7 +37,7 @@ #include <grpc/support/log.h> #include <string.h> #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/security/util/b64.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -128,7 +128,7 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, GRPC_MDELEM_METHOD_GET)) { *calld->recv_initial_metadata_flags |= GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; - *calld->recv_initial_metadata_flags |= + *calld->recv_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; } else { add_error(error_name, &error, @@ -222,7 +222,7 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, size_t path_length = GRPC_SLICE_LENGTH(path_slice); /* offset of the character '?' */ size_t offset = 0; - for (offset = 0; *path_ptr != k_query_separator && offset < path_length; + for (offset = 0; offset < path_length && *path_ptr != k_query_separator; path_ptr++, offset++) ; if (offset < path_length) { @@ -318,7 +318,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, } static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; @@ -341,7 +341,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata)); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; } } @@ -377,7 +377,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, exec_ctx, elem, op->payload->send_trailing_metadata.send_trailing_metadata); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; } } @@ -385,7 +385,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); GPR_TIMER_BEGIN("hs_start_transport_op", 0); hs_mutate_op(exec_ctx, elem, op); diff --git a/src/core/lib/channel/max_age_filter.c b/src/core/lib/channel/max_age_filter.c new file mode 100644 index 0000000000..c25481486c --- /dev/null +++ b/src/core/lib/channel/max_age_filter.c @@ -0,0 +1,386 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/message_size_filter.h" + +#include <limits.h> +#include <string.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/transport/http2_errors.h" +#include "src/core/lib/transport/service_config.h" + +#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX +#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX +#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX + +typedef struct channel_data { + /* We take a reference to the channel stack for the timer callback */ + grpc_channel_stack* channel_stack; + /* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer + and max_age_grace_timer_pending */ + gpr_mu max_age_timer_mu; + /* True if the max_age timer callback is currently pending */ + bool max_age_timer_pending; + /* True if the max_age_grace timer callback is currently pending */ + bool max_age_grace_timer_pending; + /* The timer for checking if the channel has reached its max age */ + grpc_timer max_age_timer; + /* The timer for checking if the max-aged channel has uesed up the grace + period */ + grpc_timer max_age_grace_timer; + /* The timer for checking if the channel's idle duration reaches + max_connection_idle */ + grpc_timer max_idle_timer; + /* Allowed max time a channel may have no outstanding rpcs */ + gpr_timespec max_connection_idle; + /* Allowed max time a channel may exist */ + gpr_timespec max_connection_age; + /* Allowed grace period after the channel reaches its max age */ + gpr_timespec max_connection_age_grace; + /* Closure to run when the channel's idle duration reaches max_connection_idle + and should be closed gracefully */ + grpc_closure close_max_idle_channel; + /* Closure to run when the channel reaches its max age and should be closed + gracefully */ + grpc_closure close_max_age_channel; + /* Closure to run the channel uses up its max age grace time and should be + closed forcibly */ + grpc_closure force_close_max_age_channel; + /* Closure to run when the init fo channel stack is done and the max_idle + timer should be started */ + grpc_closure start_max_idle_timer_after_init; + /* Closure to run when the init fo channel stack is done and the max_age timer + should be started */ + grpc_closure start_max_age_timer_after_init; + /* Closure to run when the goaway op is finished and the max_age_timer */ + grpc_closure start_max_age_grace_timer_after_goaway_op; + /* Closure to run when the channel connectivity state changes */ + grpc_closure channel_connectivity_changed; + /* Records the current connectivity state */ + grpc_connectivity_state connectivity_state; + /* Number of active calls */ + gpr_atm call_count; +} channel_data; + +/* Increase the nubmer of active calls. Before the increasement, if there are no + calls, the max_idle_timer should be cancelled. */ +static void increase_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { + if (gpr_atm_full_fetch_add(&chand->call_count, 1) == 0) { + grpc_timer_cancel(exec_ctx, &chand->max_idle_timer); + } +} + +/* Decrease the nubmer of active calls. After the decrement, if there are no + calls, the max_idle_timer should be started. */ +static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { + if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) { + GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_idle_timer"); + grpc_timer_init( + exec_ctx, &chand->max_idle_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_idle), + &chand->close_max_idle_channel, gpr_now(GPR_CLOCK_MONOTONIC)); + } +} + +static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + channel_data* chand = arg; + /* Decrease call_count. If there are no active calls at this time, + max_idle_timer will start here. If the number of active calls is not 0, + max_idle_timer will start after all the active calls end. */ + decrease_call_count(exec_ctx, chand); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age start_max_idle_timer_after_init"); +} + +static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + channel_data* chand = arg; + gpr_mu_lock(&chand->max_age_timer_mu); + chand->max_age_timer_pending = true; + GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); + grpc_timer_init( + exec_ctx, &chand->max_age_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_age), + &chand->close_max_age_channel, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(&chand->max_age_timer_mu); + grpc_transport_op* op = grpc_make_transport_op(NULL); + op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->connectivity_state = &chand->connectivity_state; + grpc_channel_next_op(exec_ctx, + grpc_channel_stack_element(chand->channel_stack, 0), op); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age start_max_age_timer_after_init"); +} + +static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, + void* arg, + grpc_error* error) { + channel_data* chand = arg; + gpr_mu_lock(&chand->max_age_timer_mu); + chand->max_age_grace_timer_pending = true; + GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); + grpc_timer_init(exec_ctx, &chand->max_age_grace_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + chand->max_connection_age_grace), + &chand->force_close_max_age_channel, + gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(&chand->max_age_timer_mu); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age start_max_age_grace_timer_after_goaway_op"); +} + +static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + channel_data* chand = arg; + gpr_atm_no_barrier_fetch_add(&chand->call_count, 1); + if (error == GRPC_ERROR_NONE) { + grpc_transport_op* op = grpc_make_transport_op(NULL); + op->goaway_error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_idle"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); + grpc_channel_element* elem = + grpc_channel_stack_element(chand->channel_stack, 0); + elem->filter->start_transport_op(exec_ctx, elem, op); + } else if (error != GRPC_ERROR_CANCELLED) { + GRPC_LOG_IF_ERROR("close_max_idle_channel", error); + } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age max_idle_timer"); +} + +static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + channel_data* chand = arg; + gpr_mu_lock(&chand->max_age_timer_mu); + chand->max_age_timer_pending = false; + gpr_mu_unlock(&chand->max_age_timer_mu); + if (error == GRPC_ERROR_NONE) { + GRPC_CHANNEL_STACK_REF(chand->channel_stack, + "max_age start_max_age_grace_timer_after_goaway_op"); + grpc_transport_op* op = grpc_make_transport_op( + &chand->start_max_age_grace_timer_after_goaway_op); + op->goaway_error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); + grpc_channel_element* elem = + grpc_channel_stack_element(chand->channel_stack, 0); + elem->filter->start_transport_op(exec_ctx, elem, op); + } else if (error != GRPC_ERROR_CANCELLED) { + GRPC_LOG_IF_ERROR("close_max_age_channel", error); + } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age max_age_timer"); +} + +static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + channel_data* chand = arg; + gpr_mu_lock(&chand->max_age_timer_mu); + chand->max_age_grace_timer_pending = false; + gpr_mu_unlock(&chand->max_age_timer_mu); + if (error == GRPC_ERROR_NONE) { + grpc_transport_op* op = grpc_make_transport_op(NULL); + op->disconnect_with_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel reaches max age"); + grpc_channel_element* elem = + grpc_channel_stack_element(chand->channel_stack, 0); + elem->filter->start_transport_op(exec_ctx, elem, op); + } else if (error != GRPC_ERROR_CANCELLED) { + GRPC_LOG_IF_ERROR("force_close_max_age_channel", error); + } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age max_age_grace_timer"); +} + +static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + channel_data* chand = arg; + if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_transport_op* op = grpc_make_transport_op(NULL); + op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->connectivity_state = &chand->connectivity_state; + grpc_channel_next_op( + exec_ctx, grpc_channel_stack_element(chand->channel_stack, 0), op); + } else { + gpr_mu_lock(&chand->max_age_timer_mu); + if (chand->max_age_timer_pending) { + grpc_timer_cancel(exec_ctx, &chand->max_age_timer); + chand->max_age_timer_pending = false; + } + if (chand->max_age_grace_timer_pending) { + grpc_timer_cancel(exec_ctx, &chand->max_age_grace_timer); + chand->max_age_grace_timer_pending = false; + } + gpr_mu_unlock(&chand->max_age_timer_mu); + /* If there are no active calls, this increasement will cancel + max_idle_timer, and prevent max_idle_timer from being started in the + future. */ + increase_call_count(exec_ctx, chand); + } +} + +/* Constructor for call_data. */ +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + const grpc_call_element_args* args) { + channel_data* chand = elem->channel_data; + increase_call_count(exec_ctx, chand); + return GRPC_ERROR_NONE; +} + +/* Destructor for call_data. */ +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* ignored) { + channel_data* chand = elem->channel_data; + decrease_call_count(exec_ctx, chand); +} + +/* Constructor for channel_data. */ +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + channel_data* chand = elem->channel_data; + gpr_mu_init(&chand->max_age_timer_mu); + chand->max_age_timer_pending = false; + chand->max_age_grace_timer_pending = false; + chand->channel_stack = args->channel_stack; + chand->max_connection_age = + DEFAULT_MAX_CONNECTION_AGE_MS == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_MS, GPR_TIMESPAN); + chand->max_connection_age_grace = + DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, + GPR_TIMESPAN); + chand->max_connection_idle = + DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_IDLE_MS, GPR_TIMESPAN); + for (size_t i = 0; i < args->channel_args->num_args; ++i) { + if (0 == strcmp(args->channel_args->args[i].key, + GRPC_ARG_MAX_CONNECTION_AGE_MS)) { + const int value = grpc_channel_arg_get_integer( + &args->channel_args->args[i], + (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX}); + chand->max_connection_age = + value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(value, GPR_TIMESPAN); + } else if (0 == strcmp(args->channel_args->args[i].key, + GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) { + const int value = grpc_channel_arg_get_integer( + &args->channel_args->args[i], + (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0, + INT_MAX}); + chand->max_connection_age_grace = + value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(value, GPR_TIMESPAN); + } else if (0 == strcmp(args->channel_args->args[i].key, + GRPC_ARG_MAX_CONNECTION_IDLE_MS)) { + const int value = grpc_channel_arg_get_integer( + &args->channel_args->args[i], + (grpc_integer_options){DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX}); + chand->max_connection_idle = + value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(value, GPR_TIMESPAN); + } + } + grpc_closure_init(&chand->close_max_idle_channel, close_max_idle_channel, + chand, grpc_schedule_on_exec_ctx); + grpc_closure_init(&chand->close_max_age_channel, close_max_age_channel, chand, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&chand->force_close_max_age_channel, + force_close_max_age_channel, chand, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&chand->start_max_idle_timer_after_init, + start_max_idle_timer_after_init, chand, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&chand->start_max_age_timer_after_init, + start_max_age_timer_after_init, chand, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&chand->start_max_age_grace_timer_after_goaway_op, + start_max_age_grace_timer_after_goaway_op, chand, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&chand->channel_connectivity_changed, + channel_connectivity_changed, chand, + grpc_schedule_on_exec_ctx); + + if (gpr_time_cmp(chand->max_connection_age, gpr_inf_future(GPR_TIMESPAN)) != + 0) { + /* When the channel reaches its max age, we send down an op with + goaway_error set. However, we can't send down any ops until after the + channel stack is fully initialized. If we start the timer here, we have + no guarantee that the timer won't pop before channel stack initialization + is finished. To avoid that problem, we create a closure to start the + timer, and we schedule that closure to be run after call stack + initialization is done. */ + GRPC_CHANNEL_STACK_REF(chand->channel_stack, + "max_age start_max_age_timer_after_init"); + grpc_closure_sched(exec_ctx, &chand->start_max_age_timer_after_init, + GRPC_ERROR_NONE); + } + + /* Initialize the number of calls as 1, so that the max_idle_timer will not + start until start_max_idle_timer_after_init is invoked. */ + gpr_atm_rel_store(&chand->call_count, 1); + if (gpr_time_cmp(chand->max_connection_idle, gpr_inf_future(GPR_TIMESPAN)) != + 0) { + GRPC_CHANNEL_STACK_REF(chand->channel_stack, + "max_age start_max_idle_timer_after_init"); + grpc_closure_sched(exec_ctx, &chand->start_max_idle_timer_after_init, + GRPC_ERROR_NONE); + } + return GRPC_ERROR_NONE; +} + +/* Destructor for channel_data. */ +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} + +const grpc_channel_filter grpc_max_age_filter = { + grpc_call_next_op, + grpc_channel_next_op, + 0, /* sizeof_call_data */ + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "max_age"}; diff --git a/src/core/lib/channel/max_age_filter.h b/src/core/lib/channel/max_age_filter.h new file mode 100644 index 0000000000..93e357a88e --- /dev/null +++ b/src/core/lib/channel/max_age_filter.h @@ -0,0 +1,39 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H +#define GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_max_age_filter; + +#endif /* GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H */ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 0873d9c285..57726c8476 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -132,13 +132,13 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, gpr_free(message_string); } // Invoke the next callback. - grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error); + grpc_closure_run(exec_ctx, calld->next_recv_message_ready, error); } // Start transport stream op. -static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_transport_stream_op* op) { +static void start_transport_stream_op_batch( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { call_data* calld = elem->call_data; // Check max send message size. if (op->send_message && calld->max_send_size >= 0 && @@ -148,7 +148,7 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", op->payload->send_message.send_message->length, calld->max_send_size); - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, @@ -157,7 +157,7 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, return; } // Inject callback for receiving a message. - if (op->payload->recv_message.recv_message_ready != NULL) { + if (op->recv_message) { calld->next_recv_message_ready = op->payload->recv_message.recv_message_ready; calld->recv_message = op->payload->recv_message.recv_message; @@ -256,7 +256,7 @@ static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, } const grpc_channel_filter grpc_message_size_filter = { - start_transport_stream_op, + start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index be6a6d618a..fc338342e4 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -43,7 +43,7 @@ #include "src/core/lib/security/transport/security_handshaker.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/tsi/ssl_transport_security.h" +#include "src/core/tsi/ssl_transport_security.h" typedef struct { grpc_channel_security_connector base; diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 1924e76f13..7014b98349 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -56,6 +56,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" @@ -1107,19 +1108,20 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, grpc_closure *closure) { while (true) { - /* Fast-path: CLOSURE_NOT_READY -> <closure>. - The 'release' cas here matches the 'acquire' load in set_ready and - set_shutdown ensuring that the closure (scheduled by set_ready or - set_shutdown) happens-after the I/O event on the fd */ - if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { - return; /* Fast-path successful. Return */ - } - - /* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and - set_shutdown */ - gpr_atm curr = gpr_atm_acq_load(state); + gpr_atm curr = gpr_atm_no_barrier_load(state); switch (curr) { case CLOSURE_NOT_READY: { + /* CLOSURE_NOT_READY -> <closure>. + + We're guaranteed by API that there's an acquire barrier before here, + so there's no need to double-dip and this can be a release-only. + + The release itself pairs with the acquire half of a set_ready full + barrier. */ + if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { + return; /* Successful. Return */ + } + break; /* retry */ } @@ -1134,7 +1136,7 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, is no other code that needs to 'happen-after' this) */ if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); - return; /* Slow-path successful. Return */ + return; /* Successful. Return */ } break; /* retry */ @@ -1165,30 +1167,19 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, grpc_error *shutdown_err) { - /* Try the fast-path first (i.e expect the current value to be - CLOSURE_NOT_READY */ - gpr_atm curr = CLOSURE_NOT_READY; gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT; while (true) { - /* The 'release' cas here matches the 'acquire' load in notify_on to ensure - that the closure it schedules 'happens-after' the set_shutdown is called - on the fd */ - if (gpr_atm_rel_cas(state, curr, new_state)) { - return; /* Fast-path successful. Return */ - } - - /* Fallback to slowpath. This 'acquire' load matches the 'release' cas in - notify_on and set_ready */ - curr = gpr_atm_acq_load(state); + gpr_atm curr = gpr_atm_no_barrier_load(state); switch (curr) { - case CLOSURE_READY: { + case CLOSURE_READY: + case CLOSURE_NOT_READY: + /* Need a full barrier here so that the initial load in notify_on + doesn't need a barrier */ + if (gpr_atm_full_cas(state, curr, new_state)) { + return; /* early out */ + } break; /* retry */ - } - - case CLOSURE_NOT_READY: { - break; /* retry */ - } default: { /* 'curr' is either a closure or the fd is already shutdown */ @@ -1199,10 +1190,11 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, } /* Fd is not shutdown. Schedule the closure and move the state to - shutdown state. The 'release' cas here matches the 'acquire' load in - notify_on to ensure that the closure it schedules 'happens-after' - the set_shutdown is called on the fd */ - if (gpr_atm_rel_cas(state, curr, new_state)) { + shutdown state. + Needs an acquire to pair with setting the closure (and get a + happens-after on that edge), and a release to pair with anything + loading the shutdown state. */ + if (gpr_atm_full_cas(state, curr, new_state)) { grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "FD Shutdown", &shutdown_err, 1)); @@ -1220,52 +1212,42 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, } static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { - /* Try an optimistic case first (i.e assume current state is - CLOSURE_NOT_READY). - - This 'release' cas matches the 'acquire' load in notify_on ensuring that - any closure (scheduled by notify_on) 'happens-after' the return from - epoll_pwait */ - if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { - return; /* early out */ - } - - /* The 'acquire' load here matches the 'release' cas in notify_on and - set_shutdown */ - gpr_atm curr = gpr_atm_acq_load(state); - switch (curr) { - case CLOSURE_READY: { - /* Already ready. We are done here */ - break; - } + while (true) { + gpr_atm curr = gpr_atm_no_barrier_load(state); - case CLOSURE_NOT_READY: { - /* The state was not CLOSURE_NOT_READY when we checked initially at the - beginning of this function but now it is CLOSURE_NOT_READY again. - This is only possible if the state transitioned out of - CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then - back to CLOSURE_NOT_READY again (i.e after we entered this function, - the fd became "ready" and the necessary actions were already done). - So there is no need to make the state CLOSURE_READY now */ - break; - } + switch (curr) { + case CLOSURE_READY: { + /* Already ready. We are done here */ + return; + } - default: { - /* 'curr' is either a closure or the fd is shutdown */ - if ((curr & FD_SHUTDOWN_BIT) > 0) { - /* The fd is shutdown. Do nothing */ - } else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) { - /* The cas above was no-barrier since the state is being transitioned to - CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any - closures when transitioning out of CLOSURE_NO_READY state (i.e there - is no other code that needs to 'happen-after' this) */ + case CLOSURE_NOT_READY: { + /* No barrier required as we're transitioning to a state that does not + involve a closure */ + if (gpr_atm_no_barrier_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { + return; /* early out */ + } + break; /* retry */ + } - grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); + default: { + /* 'curr' is either a closure or the fd is shutdown */ + if ((curr & FD_SHUTDOWN_BIT) > 0) { + /* The fd is shutdown. Do nothing */ + return; + } + /* Full cas: acquire pairs with this cas' release in the event of a + spurious set_ready; release pairs with this or the acquire in + notify_on (or set_shutdown) */ + else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) { + grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); + return; + } + /* else the state changed again (only possible by either a racing + set_ready or set_shutdown functions. In both these cases, the closure + would have been scheduled for execution. So we are done here */ + return; } - /* else the state changed again (only possible by either a racing - set_ready or set_shutdown functions. In both these cases, the closure - would have been scheduled for execution. So we are done here */ - break; } } } @@ -1486,8 +1468,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, return 0; } timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( + int millis = gpr_time_to_millis(gpr_time_add( timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); + return millis >= 1 ? millis : 1; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -1669,6 +1652,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; if (data_ptr == &global_wakeup_fd) { + grpc_timer_consume_kick(); append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else if (data_ptr == &pi->workqueue_wakeup_fd) { diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index ca6e855611..d90f223362 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -52,6 +52,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" @@ -1006,6 +1007,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { + grpc_timer_consume_kick(); work_combine_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd)); } diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index d84a278b18..e0338f93c7 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -101,6 +101,9 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, void grpc_timer_list_init(gpr_timespec now); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); +/* Consume a kick issued by grpc_kick_poller */ +void grpc_timer_consume_kick(void); + /* the following must be implemented by each iomgr implementation */ void grpc_kick_poller(void); diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index e53c801929..d8e6068431 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -37,9 +37,13 @@ #include "src/core/lib/iomgr/timer.h" +#include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> +#include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/time_averaged_stats.h" #include "src/core/lib/iomgr/timer_heap.h" #include "src/core/lib/support/spinlock.h" @@ -52,12 +56,15 @@ #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 +int grpc_timer_trace = 0; +int grpc_timer_check_trace = 0; + typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; /* All and only timers with deadlines <= this will be in the heap. */ - gpr_timespec queue_deadline_cap; - gpr_timespec min_deadline; + gpr_atm queue_deadline_cap; + gpr_atm min_deadline; /* Index in the g_shard_queue */ uint32_t shard_queue_index; /* This holds all timers with deadlines < queue_deadline_cap. Timers in this @@ -67,38 +74,92 @@ typedef struct { grpc_timer list; } shard_type; -/* Protects g_shard_queue */ -static gpr_mu g_mu; -/* Allow only one run_some_expired_timers at once */ -static gpr_spinlock g_checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER; +struct shared_mutables { + gpr_atm min_timer; + /* Allow only one run_some_expired_timers at once */ + gpr_spinlock checker_mu; + bool initialized; + /* Protects g_shard_queue */ + gpr_mu mu; +} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); + +static struct shared_mutables g_shared_mutables = { + .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, +}; static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; -/* Protected by g_mu */ +/* Protected by g_shared_mutables.mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static bool g_initialized = false; +static gpr_timespec g_start_time; + +GPR_TLS_DECL(g_last_seen_min_timer); + +static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { + if (a > GPR_ATM_MAX - b) { + return GPR_ATM_MAX; + } + return a + b; +} + +static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, + gpr_atm *next, grpc_error *error); + +static gpr_timespec dbl_to_ts(double d) { + gpr_timespec ts; + ts.tv_sec = (int64_t)d; + ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); + ts.clock_type = GPR_TIMESPAN; + return ts; +} + +static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time); + double x = GPR_MS_PER_SEC * (double)ts.tv_sec + + (double)ts.tv_nsec / GPR_NS_PER_MS + + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time); + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, grpc_error *error); +static gpr_timespec atm_to_timespec(gpr_atm x) { + return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); +} -static gpr_timespec compute_min_deadline(shard_type *shard) { +static gpr_atm compute_min_deadline(shard_type *shard) { return grpc_timer_heap_is_empty(&shard->heap) - ? shard->queue_deadline_cap + ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } void grpc_timer_list_init(gpr_timespec now) { uint32_t i; - g_initialized = true; - gpr_mu_init(&g_mu); + g_shared_mutables.initialized = true; + gpr_mu_init(&g_shared_mutables.mu); g_clock_type = now.clock_type; + g_start_time = now; + g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + gpr_tls_init(&g_last_seen_min_timer); + gpr_tls_set(&g_last_seen_min_timer, 0); + grpc_register_tracer("timer", &grpc_timer_trace); + grpc_register_tracer("timer_check", &grpc_timer_check_trace); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); - shard->queue_deadline_cap = now; + shard->queue_deadline_cap = g_shared_mutables.min_timer; shard->shard_queue_index = i; grpc_timer_heap_init(&shard->heap); shard->list.next = shard->list.prev = &shard->list; @@ -110,29 +171,23 @@ void grpc_timer_list_init(gpr_timespec now) { void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; run_some_expired_timers( - exec_ctx, gpr_inf_future(g_clock_type), NULL, + exec_ctx, GPR_ATM_MAX, NULL, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); grpc_timer_heap_destroy(&shard->heap); } - gpr_mu_destroy(&g_mu); - g_initialized = false; + gpr_mu_destroy(&g_shared_mutables.mu); + gpr_tls_destroy(&g_last_seen_min_timer); + g_shared_mutables.initialized = false; } static double ts_to_dbl(gpr_timespec ts) { return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; } -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - +/* returns true if the first element in the list */ static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; timer->prev = head->prev; @@ -158,15 +213,13 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { static void note_deadline_change(shard_type *shard) { while (shard->shard_queue_index > 0 && - gpr_time_cmp( - shard->min_deadline, - g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) { + shard->min_deadline < + g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); } while (shard->shard_queue_index < NUM_SHARDS - 1 && - gpr_time_cmp( - shard->min_deadline, - g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) { + shard->min_deadline > + g_shard_queue[shard->shard_queue_index + 1]->min_deadline) { swap_adjacent_shards_in_queue(shard->shard_queue_index); } } @@ -179,9 +232,17 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - timer->deadline = deadline; + timer->deadline = timespec_to_atm_round_up(deadline); + + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR + "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", + timer, deadline.tv_sec, deadline.tv_nsec, timer->deadline, + now.tv_sec, now.tv_nsec, timespec_to_atm_round_down(now), closure, + closure->cb); + } - if (!g_initialized) { + if (!g_shared_mutables.initialized) { timer->pending = false; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -201,12 +262,18 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); - if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { + if (timer->deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; list_join(&shard->list, timer); } + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR + " => is_first_timer=%s", + (int)(shard - g_shards), shard->queue_deadline_cap, + is_first_timer ? "true" : "false"); + } gpr_mu_unlock(&shard->mu); /* Deadline may have decreased, we need to adjust the master queue. Note @@ -221,28 +288,41 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, In that case, the timer will simply have to wait for the next grpc_timer_check. */ if (is_first_timer) { - gpr_mu_lock(&g_mu); - if (gpr_time_cmp(deadline, shard->min_deadline) < 0) { - gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline; + gpr_mu_lock(&g_shared_mutables.mu); + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, + shard->min_deadline); + } + if (timer->deadline < shard->min_deadline) { + gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; + shard->min_deadline = timer->deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && - gpr_time_cmp(deadline, old_min_deadline) < 0) { + if (shard->shard_queue_index == 0 && timer->deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, timer->deadline); grpc_kick_poller(); } } - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_shared_mutables.mu); } } +void grpc_timer_consume_kick(void) { + /* force re-evaluation of last seeen min */ + gpr_tls_set(&g_last_seen_min_timer, 0); +} + void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - if (!g_initialized) { + if (!g_shared_mutables.initialized) { /* must have already been cancelled, also the shard mutex is invalid */ return; } shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; gpr_mu_lock(&shard->mu); + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, + timer->pending ? "true" : "false"); + } if (timer->pending) { grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); timer->pending = false; @@ -260,7 +340,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { for timers that fall at or under it. Returns true if the queue is no longer empty. REQUIRES: shard->mu locked */ -static int refill_queue(shard_type *shard, gpr_timespec now) { +static int refill_queue(shard_type *shard, gpr_atm now) { /* Compute the new queue window width and bound by the limits: */ double computed_deadline_delta = grpc_time_averaged_stats_update_average(&shard->stats) * @@ -271,12 +351,22 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { grpc_timer *timer, *next; /* Compute the new cap and put all timers under it into the queue: */ - shard->queue_deadline_cap = gpr_time_add( - gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta)); + shard->queue_deadline_cap = + saturating_add(GPR_MAX(now, shard->queue_deadline_cap), + (gpr_atm)(deadline_delta * 1000.0)); + + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR, + (int)(shard - g_shards), shard->queue_deadline_cap); + } for (timer = shard->list.next; timer != &shard->list; timer = next) { next = timer->next; - if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) { + if (timer->deadline < shard->queue_deadline_cap) { + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap", + timer->deadline); + } list_remove(timer); grpc_timer_heap_add(&shard->heap, timer); } @@ -287,15 +377,29 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { /* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { +static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { grpc_timer *timer; for (;;) { + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s", + (int)(shard - g_shards), + grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false"); + } if (grpc_timer_heap_is_empty(&shard->heap)) { - if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL; + if (now < shard->queue_deadline_cap) return NULL; if (!refill_queue(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); - if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, + " .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR, + timer->deadline, now); + } + if (timer->deadline > now) return NULL; + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer, + now - timer->deadline); + } timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; @@ -304,7 +408,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { /* REQUIRES: shard->mu unlocked */ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, - gpr_timespec now, gpr_timespec *new_min_deadline, + gpr_atm now, gpr_atm *new_min_deadline, grpc_error *error) { size_t n = 0; grpc_timer *timer; @@ -318,17 +422,29 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, return n; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, grpc_error *error) { +static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, + gpr_atm *next, grpc_error *error) { size_t n = 0; - /* TODO(ctiller): verify that there are any timers (atomically) here */ + gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); + gpr_tls_set(&g_last_seen_min_timer, min_timer); + if (now < min_timer) { + if (next != NULL) *next = GPR_MIN(*next, min_timer); + return 0; + } - if (gpr_spinlock_trylock(&g_checker_mu)) { - gpr_mu_lock(&g_mu); + if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { + gpr_mu_lock(&g_shared_mutables.mu); - while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { - gpr_timespec new_min_deadline; + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR, + (int)(g_shard_queue[0] - g_shards), + g_shard_queue[0]->min_deadline); + } + + while (g_shard_queue[0]->min_deadline < now || + (now != GPR_ATM_MAX && g_shard_queue[0]->min_deadline == now)) { + gpr_atm new_min_deadline; /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that @@ -336,6 +452,14 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR + ", shard[%d]->min_deadline %" PRIdPTR + " --> %" PRIdPTR ", now=%" PRIdPTR, + n, (int)(g_shard_queue[0] - g_shards), + g_shard_queue[0]->min_deadline, new_min_deadline, now); + } + /* An grpc_timer_init() on the shard could intervene here, adding a new timer that is earlier than new_min_deadline. However, grpc_timer_init() will block on the master_lock before it can call @@ -346,23 +470,24 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, } if (next) { - *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline); } - gpr_mu_unlock(&g_mu); - gpr_spinlock_unlock(&g_checker_mu); + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, + g_shard_queue[0]->min_deadline); + gpr_mu_unlock(&g_shared_mutables.mu); + gpr_spinlock_unlock(&g_shared_mutables.checker_mu); } else if (next != NULL) { /* TODO(ctiller): this forces calling code to do an short poll, and then retry the timer check (because this time through the timer list was contended). - We could reduce the cost here dramatically by keeping a count of how many - currently active pollers got through the uncontended case above + We could reduce the cost here dramatically by keeping a count of how + many currently active pollers got through the uncontended case above successfully, and waking up other pollers IFF that count drops to zero. Once that count is in place, this entire else branch could disappear. */ - *next = gpr_time_min( - *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); + *next = GPR_MIN(*next, now + 1); } GRPC_ERROR_UNREF(error); @@ -372,12 +497,71 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next) { + // prelude GPR_ASSERT(now.clock_type == g_clock_type); - return run_some_expired_timers( - exec_ctx, now, next, + gpr_atm now_atm = timespec_to_atm_round_down(now); + + /* fetch from a thread-local first: this avoids contention on a globally + mutable cacheline in the common case */ + gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now_atm < min_timer) { + if (next != NULL) { + *next = + atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); + } + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, + "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, + now_atm, min_timer); + } + return 0; + } + + grpc_error *shutdown_error = gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system")); + : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); + + // tracing + if (grpc_timer_check_trace) { + char *next_str; + if (next == NULL) { + next_str = gpr_strdup("NULL"); + } else { + gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, + next->tv_nsec, timespec_to_atm_round_down(*next)); + } + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR + "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + now.tv_sec, now.tv_nsec, now_atm, next_str, + gpr_tls_get(&g_last_seen_min_timer), + gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); + gpr_free(next_str); + } + // actual code + bool r; + gpr_atm next_atm; + if (next == NULL) { + r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); + } else { + next_atm = timespec_to_atm_round_down(*next); + r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); + *next = atm_to_timespec(next_atm); + } + // tracing + if (grpc_timer_check_trace) { + char *next_str; + if (next == NULL) { + next_str = gpr_strdup("NULL"); + } else { + gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, + next->tv_nsec, next_atm); + } + gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r, + next_str); + gpr_free(next_str); + } + return r > 0; } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 1608dce9fb..c79a431aa0 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -38,7 +38,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" struct grpc_timer { - gpr_timespec deadline; + gpr_atm deadline; uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ bool pending; struct grpc_timer *next; diff --git a/src/core/lib/iomgr/timer_heap.c b/src/core/lib/iomgr/timer_heap.c index f736d335e6..03ccfe023a 100644 --- a/src/core/lib/iomgr/timer_heap.c +++ b/src/core/lib/iomgr/timer_heap.c @@ -50,7 +50,7 @@ static void adjust_upwards(grpc_timer **first, uint32_t i, grpc_timer *t) { while (i > 0) { uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(first[parent]->deadline, t->deadline) <= 0) break; + if (first[parent]->deadline <= t->deadline) break; first[i] = first[parent]; first[i]->heap_index = i; i = parent; @@ -68,12 +68,12 @@ static void adjust_downwards(grpc_timer **first, uint32_t i, uint32_t length, uint32_t left_child = 1u + 2u * i; if (left_child >= length) break; uint32_t right_child = left_child + 1; - uint32_t next_i = right_child < length && - gpr_time_cmp(first[left_child]->deadline, - first[right_child]->deadline) > 0 - ? right_child - : left_child; - if (gpr_time_cmp(t->deadline, first[next_i]->deadline) <= 0) break; + uint32_t next_i = + right_child < length && + first[left_child]->deadline > first[right_child]->deadline + ? right_child + : left_child; + if (t->deadline <= first[next_i]->deadline) break; first[i] = first[next_i]; first[i]->heap_index = i; i = next_i; @@ -97,7 +97,7 @@ static void maybe_shrink(grpc_timer_heap *heap) { static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) { uint32_t i = timer->heap_index; uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) > 0) { + if (heap->timers[parent]->deadline > timer->deadline) { adjust_upwards(heap->timers, i, timer); } else { adjust_downwards(heap->timers, i, heap->timer_count, timer); diff --git a/src/core/lib/security/credentials/jwt/json_token.c b/src/core/lib/security/credentials/jwt/json_token.c index 192a5f47ed..aa905725fc 100644 --- a/src/core/lib/security/credentials/jwt/json_token.c +++ b/src/core/lib/security/credentials/jwt/json_token.c @@ -40,8 +40,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> -#include "src/core/lib/security/util/b64.h" #include "src/core/lib/security/util/json_util.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/support/string.h" #include <openssl/bio.h> diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 5c59cf0f4a..0e2a264371 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -45,10 +45,10 @@ #include "src/core/lib/http/httpcli.h" #include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/security/util/b64.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/tsi/ssl_types.h" +#include "src/core/tsi/ssl_types.h" /* --- Utils. --- */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index b69f38758c..f526653ffa 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -64,7 +64,7 @@ typedef struct { pollset_set so that work can progress when this call wants work to progress */ grpc_polling_entity *pollent; - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; uint8_t security_context_set; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; grpc_auth_metadata_context auth_md_context; @@ -108,7 +108,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, const char *error_details) { grpc_call_element *elem = (grpc_call_element *)user_data; call_data *calld = elem->call_data; - grpc_transport_stream_op *op = &calld->op; + grpc_transport_stream_op_batch *op = &calld->op; grpc_metadata_batch *mdb; size_t i; reset_auth_metadata_context(&calld->auth_md_context); @@ -136,7 +136,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, if (error == GRPC_ERROR_NONE) { grpc_call_next_op(exec_ctx, elem, op); } else { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); } } @@ -172,7 +172,7 @@ void build_auth_metadata_context(grpc_security_connector *sc, static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = @@ -193,7 +193,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, calld->creds = grpc_composite_call_credentials_create(channel_call_creds, ctx->creds, NULL); if (calld->creds == NULL) { - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -244,7 +244,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, that is being sent or received. */ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("auth_start_transport_op", 0); /* grab pointers to our data from the call element */ diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 568d70fa38..24da949e48 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -49,7 +49,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_interface.h" #define STAGING_BUFFER_SIZE 8192 diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index b0cbc83639..2b51706161 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -54,8 +54,8 @@ #include "src/core/lib/security/transport/security_handshaker.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/tsi/fake_transport_security.h" -#include "src/core/lib/tsi/ssl_transport_security.h" +#include "src/core/tsi/fake_transport_security.h" +#include "src/core/tsi/ssl_transport_security.h" /* -- Constants. -- */ diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 3df2fecd39..cf56cb3183 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -39,7 +39,7 @@ #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/tcp_server.h" -#include "src/core/lib/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_interface.h" /* --- status enum. --- */ diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index b103b7400c..1aca76f9e8 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -49,7 +49,7 @@ typedef struct call_data { up-call on transport_op, and remember to call our on_done_recv member after handling it. */ grpc_closure auth_on_recv; - grpc_transport_stream_op *transport_op; + grpc_transport_stream_op_batch *transport_op; grpc_metadata_array md; const grpc_metadata *consumed_md; size_t num_consumed_md; @@ -138,13 +138,11 @@ static void on_md_processing_done( error_details = error_details != NULL ? error_details : "Authentication metadata processing failed."; - calld->transport_op->send_initial_metadata = NULL; if (calld->transport_op->send_message) { grpc_byte_stream_destroy( &exec_ctx, calld->transport_op->payload->send_message.send_message); - calld->transport_op->send_message = false; + calld->transport_op->payload->send_message.send_message = NULL; } - calld->transport_op->send_trailing_metadata = NULL; grpc_closure_sched( &exec_ctx, calld->on_done_recv, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), @@ -172,7 +170,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, } static void set_recv_ops_md_callbacks(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; if (op->recv_initial_metadata) { @@ -194,7 +192,7 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem, that is being sent or received. */ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { set_recv_ops_md_callbacks(elem, op); grpc_call_next_op(exec_ctx, elem, op); } diff --git a/src/core/lib/security/transport/tsi_error.h b/src/core/lib/security/transport/tsi_error.h index 636fbb89cf..b84693b5de 100644 --- a/src/core/lib/security/transport/tsi_error.h +++ b/src/core/lib/security/transport/tsi_error.h @@ -35,7 +35,7 @@ #define GRPC_CORE_LIB_SECURITY_TRANSPORT_TSI_ERROR_H #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_interface.h" grpc_error *grpc_set_tsi_error_result(grpc_error *error, tsi_result result); diff --git a/src/core/lib/security/util/b64.c b/src/core/lib/slice/b64.c index 0d5a917660..2007cc4810 100644 --- a/src/core/lib/security/util/b64.c +++ b/src/core/lib/slice/b64.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/security/util/b64.h" +#include "src/core/lib/slice/b64.h" #include <stdint.h> #include <string.h> diff --git a/src/core/lib/security/util/b64.h b/src/core/lib/slice/b64.h index ef52291c6a..5cc821f4bf 100644 --- a/src/core/lib/security/util/b64.h +++ b/src/core/lib/slice/b64.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_SECURITY_UTIL_B64_H -#define GRPC_CORE_LIB_SECURITY_UTIL_B64_H +#ifndef GRPC_CORE_LIB_SLICE_B64_H +#define GRPC_CORE_LIB_SLICE_B64_H #include <grpc/slice.h> @@ -62,4 +62,4 @@ grpc_slice grpc_base64_decode(grpc_exec_ctx *exec_ctx, const char *b64, grpc_slice grpc_base64_decode_with_len(grpc_exec_ctx *exec_ctx, const char *b64, size_t b64_len, int url_safe); -#endif /* GRPC_CORE_LIB_SECURITY_UTIL_B64_H */ +#endif /* GRPC_CORE_LIB_SLICE_B64_H */ diff --git a/src/core/lib/support/time.c b/src/core/lib/support/time.c index 5a7d043aed..c5f94d46f7 100644 --- a/src/core/lib/support/time.c +++ b/src/core/lib/support/time.c @@ -42,7 +42,7 @@ int gpr_time_cmp(gpr_timespec a, gpr_timespec b) { int cmp = (a.tv_sec > b.tv_sec) - (a.tv_sec < b.tv_sec); GPR_ASSERT(a.clock_type == b.clock_type); - if (cmp == 0) { + if (cmp == 0 && a.tv_sec != INT64_MAX && a.tv_sec != INT64_MIN) { cmp = (a.tv_nsec > b.tv_nsec) - (a.tv_nsec < b.tv_nsec); } return cmp; @@ -244,15 +244,9 @@ gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type clock_type) { return t; } - if (t.tv_nsec == 0) { - if (t.tv_sec == INT64_MAX) { - t.clock_type = clock_type; - return t; - } - if (t.tv_sec == INT64_MIN) { - t.clock_type = clock_type; - return t; - } + if (t.tv_sec == INT64_MAX || t.tv_sec == INT64_MIN) { + t.clock_type = clock_type; + return t; } if (clock_type == GPR_TIMESPAN) { diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index b3c686c4d9..315c906baa 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -117,9 +117,21 @@ static received_status unpack_received_status(gpr_atm atm) { typedef struct batch_control { grpc_call *call; + /* Share memory for cq_completion and notify_tag as they are never needed + simultaneously. Each byte used in this data structure count as six bytes + per call, so any savings we can make are worthwhile, + + We use notify_tag to determine whether or not to send notification to the + completion queue. Once we've made that determination, we can reuse the + memory for cq_completion. */ union { grpc_cq_completion cq_completion; struct { + /* Any given op indicates completion by either (a) calling a closure or + (b) sending a notification on the call's completion queue. If + \a is_closure is true, \a tag indicates a closure to be invoked; + otherwise, \a tag indicates the tag to be used in the notification to + be sent to the completion queue. */ void *tag; bool is_closure; } notify_tag; @@ -130,7 +142,7 @@ typedef struct batch_control { grpc_error *errors[MAX_ERRORS_PER_BATCH]; gpr_atm num_errors; - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; } batch_control; typedef struct { @@ -176,7 +188,7 @@ struct grpc_call { bool has_initial_md_been_received; batch_control *active_batches[MAX_CONCURRENT_BATCHES]; - grpc_transport_stream_op_payload stream_op_payload; + grpc_transport_stream_op_batch_payload stream_op_payload; /* first idx: is_receiving, second idx: is_trailing */ grpc_metadata_batch metadata_batch[2][2]; @@ -241,7 +253,7 @@ int grpc_call_error_trace = 0; CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description); @@ -562,12 +574,12 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { } static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { grpc_call_element *elem; GPR_TIMER_BEGIN("execute_op", 0); elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->start_transport_stream_op(exec_ctx, elem, op); + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); GPR_TIMER_END("execute_op", 0); } @@ -620,7 +632,7 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - grpc_transport_stream_op *op = grpc_make_transport_stream_op( + grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; @@ -1410,8 +1422,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->completion_data.notify_tag.is_closure = (uint8_t)(is_notify_tag_closure != 0); - grpc_transport_stream_op *stream_op = &bctl->op; - grpc_transport_stream_op_payload *stream_op_payload = + grpc_transport_stream_op_batch *stream_op = &bctl->op; + grpc_transport_stream_op_batch_payload *stream_op_payload = &call->stream_op_payload; stream_op->covered_by_poller = true; @@ -1526,7 +1538,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } stream_op->send_trailing_metadata = true; - call->sent_final_op = 1; + call->sent_final_op = true; stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; @@ -1550,7 +1562,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } stream_op->send_trailing_metadata = true; - call->sent_final_op = 1; + call->sent_final_op = true; GPR_ASSERT(call->send_extra_metadata_count == 0); call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( @@ -1606,7 +1618,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, from server.c. In that case, it's coming from accept_stream, and in that case we're not necessarily covered by a poller. */ stream_op->covered_by_poller = call->is_client; - call->received_initial_metadata = 1; + call->received_initial_metadata = true; call->buffered_metadata[0] = op->data.recv_initial_metadata.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, @@ -1653,7 +1665,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->requested_final_op = 1; + call->requested_final_op = true; call->buffered_metadata[1] = op->data.recv_status_on_client.trailing_metadata; call->final_op.client.status = op->data.recv_status_on_client.status; @@ -1680,7 +1692,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->requested_final_op = 1; + call->requested_final_op = true; call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; stream_op->recv_trailing_metadata = true; @@ -1713,25 +1725,25 @@ done: done_with_error: /* reverse any mutations that occured */ if (stream_op->send_initial_metadata) { - call->sent_initial_metadata = 0; + call->sent_initial_metadata = false; grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]); } if (stream_op->send_message) { - call->sending_message = 0; + call->sending_message = false; grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base); } if (stream_op->send_trailing_metadata) { - call->sent_final_op = 0; + call->sent_final_op = false; grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]); } if (stream_op->recv_initial_metadata) { - call->received_initial_metadata = 0; + call->received_initial_metadata = false; } if (stream_op->recv_message) { - call->receiving_message = 0; + call->receiving_message = false; } if (stream_op->recv_trailing_metadata) { - call->requested_final_op = 0; + call->requested_final_op = false; } goto done; } diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index b4bfb92042..b3ba826bbc 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -150,17 +150,20 @@ grpc_channel *grpc_channel_create_with_builder( } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) { channel->compression_options.default_level.is_set = true; - GPR_ASSERT(args->args[i].value.integer >= 0 && - args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT); channel->compression_options.default_level.level = - (grpc_compression_level)args->args[i].value.integer; + (grpc_compression_level)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){GRPC_COMPRESS_LEVEL_NONE, + GRPC_COMPRESS_LEVEL_NONE, + GRPC_COMPRESS_LEVEL_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { channel->compression_options.default_algorithm.is_set = true; - GPR_ASSERT(args->args[i].value.integer >= 0 && - args->args[i].value.integer < GRPC_COMPRESS_ALGORITHMS_COUNT); channel->compression_options.default_algorithm.algorithm = - (grpc_compression_algorithm)args->args[i].value.integer; + (grpc_compression_algorithm)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, + GRPC_COMPRESS_ALGORITHMS_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 91bd014a0e..b46ecac18d 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -47,6 +47,7 @@ #include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" +#include "src/core/lib/channel/max_age_filter.h" #include "src/core/lib/channel/message_size_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" @@ -114,6 +115,9 @@ static void register_builtin_channel_init() { GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, (void *)&grpc_server_deadline_filter); grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, + (void *)&grpc_max_age_filter); + grpc_channel_init_register_stage( GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, (void *)&grpc_message_size_filter); grpc_channel_init_register_stage( diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 46b9a8f922..921ef87e36 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -43,7 +43,7 @@ #include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/security/transport/security_handshaker.h" #include "src/core/lib/surface/channel_init.h" -#include "src/core/lib/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_interface.h" void grpc_security_pre_init(void) { grpc_register_tracer("secure_endpoint", &grpc_trace_secure_endpoint); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 18b4f3691b..82428c42c0 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -80,9 +80,9 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } -static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void lame_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->recv_initial_metadata) { fill_metadata(exec_ctx, elem, @@ -91,7 +91,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, fill_metadata(exec_ctx, elem, op->payload->recv_trailing_metadata.recv_trailing_metadata); } - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } @@ -150,7 +150,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_lame_filter = { - lame_start_transport_stream_op, + lame_start_transport_stream_op_batch, lame_start_transport_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 0c8a382f38..191ee75252 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -776,7 +776,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, } static void server_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; if (op->recv_initial_metadata) { @@ -792,9 +792,9 @@ static void server_mutate_op(grpc_call_element *elem, } } -static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void server_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); @@ -958,7 +958,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_server_top_filter = { - server_start_transport_stream_op, + server_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index bcaf899910..df8d1f6fc0 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -73,4 +73,4 @@ void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator); // Completes a previously started ping void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator); -#endif +#endif /* GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H */ diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index c232bd56d3..82c4e004b7 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -170,7 +170,7 @@ int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { transport->vtable->perform_stream_op(exec_ctx, transport, stream, op); } @@ -213,9 +213,9 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, return transport->vtable->get_endpoint(exec_ctx, transport); } -void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_error *error) { +void grpc_transport_stream_op_batch_finish_with_failure( + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_error *error) { if (op->recv_message) { grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); @@ -258,8 +258,8 @@ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { typedef struct { grpc_closure outer_on_complete; grpc_closure *inner_on_complete; - grpc_transport_stream_op op; - grpc_transport_stream_op_payload payload; + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload payload; } made_transport_stream_op; static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, @@ -270,7 +270,7 @@ static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error)); } -grpc_transport_stream_op *grpc_make_transport_stream_op( +grpc_transport_stream_op_batch *grpc_make_transport_stream_op( grpc_closure *on_complete) { made_transport_stream_op *op = gpr_zalloc(sizeof(*op)); op->op.payload = &op->payload; diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index ab179f585c..93369cc689 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -113,19 +113,19 @@ typedef struct { grpc_closure closure; } grpc_handler_private_op_data; -typedef struct grpc_transport_stream_op_payload - grpc_transport_stream_op_payload; +typedef struct grpc_transport_stream_op_batch_payload + grpc_transport_stream_op_batch_payload; /* Transport stream op: a set of operations to perform on a transport against a single stream */ -typedef struct grpc_transport_stream_op { +typedef struct grpc_transport_stream_op_batch { /** Should be enqueued when all requested operations (excluding recv_message and recv_initial_metadata which have their own closures) in a given batch have been completed. */ grpc_closure *on_complete; /** Values for the stream op (fields set are determined by flags above) */ - grpc_transport_stream_op_payload *payload; + grpc_transport_stream_op_batch_payload *payload; /** Is the completion of this op covered by a poller (if false: the op should complete independently of some pollset being polled) */ @@ -161,9 +161,9 @@ typedef struct grpc_transport_stream_op { * current handler of the op */ grpc_handler_private_op_data handler_private; -} grpc_transport_stream_op; +} grpc_transport_stream_op_batch; -struct grpc_transport_stream_op_payload { +struct grpc_transport_stream_op_batch_payload { struct { grpc_metadata_batch *send_initial_metadata; /** Iff send_initial_metadata != NULL, flags associated with @@ -289,11 +289,11 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_stream *stream, grpc_closure *then_schedule_closure); -void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_error *error); +void grpc_transport_stream_op_batch_finish_with_failure( + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_error *error); -char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); +char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op); char *grpc_transport_op_string(grpc_transport_op *op); /* Send a batch of operations on a transport @@ -304,11 +304,12 @@ char *grpc_transport_op_string(grpc_transport_op *op); transport - the transport on which to initiate the stream stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. - op - a grpc_transport_stream_op specifying the op to perform */ + op - a grpc_transport_stream_op_batch specifying the op to perform + */ void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *transport, @@ -340,9 +341,10 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to \a on_consumed and then delete the returned transport op */ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); -/* Allocate a grpc_transport_stream_op, and preconfigure the on_consumed closure +/* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed + closure to \a on_consumed and then delete the returned transport op */ -grpc_transport_stream_op *grpc_make_transport_stream_op( +grpc_transport_stream_op_batch *grpc_make_transport_stream_op( grpc_closure *on_consumed); #ifdef __cplusplus diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 6f688bf8d2..bbb19a34bd 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -59,7 +59,8 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_perform_stream_op */ void (*perform_stream_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, grpc_transport_stream_op *op); + grpc_stream *stream, + grpc_transport_stream_op_batch *op); /* implementation of grpc_transport_perform_op */ void (*perform_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self, diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index 0ec6a6ea5c..3a2a793e01 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -71,7 +71,8 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { } } -char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { +char *grpc_transport_stream_op_batch_string( + grpc_transport_stream_op_batch *op) { char *tmp; char *out; @@ -208,8 +209,9 @@ char *grpc_transport_op_string(grpc_transport_op *op) { } void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_transport_stream_op *op) { - char *str = grpc_transport_stream_op_string(op); + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + char *str = grpc_transport_stream_op_batch_string(op); gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); gpr_free(str); } diff --git a/src/core/lib/tsi/README.md b/src/core/tsi/README.md index 3ca3c1ef38..3ca3c1ef38 100644 --- a/src/core/lib/tsi/README.md +++ b/src/core/tsi/README.md diff --git a/src/core/lib/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index bbe323df3b..822fad51cb 100644 --- a/src/core/lib/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/tsi/fake_transport_security.h" +#include "src/core/tsi/fake_transport_security.h" #include <stdlib.h> #include <string.h> @@ -40,7 +40,7 @@ #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/useful.h> -#include "src/core/lib/tsi/transport_security.h" +#include "src/core/tsi/transport_security.h" /* --- Constants. ---*/ #define TSI_FAKE_FRAME_HEADER_SIZE 4 diff --git a/src/core/lib/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h index 54a9469b58..0697c7279f 100644 --- a/src/core/lib/tsi/fake_transport_security.h +++ b/src/core/tsi/fake_transport_security.h @@ -31,10 +31,10 @@ * */ -#ifndef GRPC_CORE_LIB_TSI_FAKE_TRANSPORT_SECURITY_H -#define GRPC_CORE_LIB_TSI_FAKE_TRANSPORT_SECURITY_H +#ifndef GRPC_CORE_TSI_FAKE_TRANSPORT_SECURITY_H +#define GRPC_CORE_TSI_FAKE_TRANSPORT_SECURITY_H -#include "src/core/lib/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_interface.h" #ifdef __cplusplus extern "C" { @@ -58,4 +58,4 @@ tsi_frame_protector *tsi_create_fake_protector( } #endif -#endif /* GRPC_CORE_LIB_TSI_FAKE_TRANSPORT_SECURITY_H */ +#endif /* GRPC_CORE_TSI_FAKE_TRANSPORT_SECURITY_H */ diff --git a/src/core/lib/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 53aabdb926..a0325cc183 100644 --- a/src/core/lib/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/tsi/ssl_transport_security.h" +#include "src/core/tsi/ssl_transport_security.h" #include <grpc/support/port_platform.h> @@ -60,8 +60,8 @@ #include <openssl/x509.h> #include <openssl/x509v3.h> -#include "src/core/lib/tsi/ssl_types.h" -#include "src/core/lib/tsi/transport_security.h" +#include "src/core/tsi/ssl_types.h" +#include "src/core/tsi/transport_security.h" /* --- Constants. ---*/ diff --git a/src/core/lib/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h index 7407246118..0a527e9021 100644 --- a/src/core/lib/tsi/ssl_transport_security.h +++ b/src/core/tsi/ssl_transport_security.h @@ -31,10 +31,10 @@ * */ -#ifndef GRPC_CORE_LIB_TSI_SSL_TRANSPORT_SECURITY_H -#define GRPC_CORE_LIB_TSI_SSL_TRANSPORT_SECURITY_H +#ifndef GRPC_CORE_TSI_SSL_TRANSPORT_SECURITY_H +#define GRPC_CORE_TSI_SSL_TRANSPORT_SECURITY_H -#include "src/core/lib/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_interface.h" #ifdef __cplusplus extern "C" { @@ -188,4 +188,4 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name); } #endif -#endif /* GRPC_CORE_LIB_TSI_SSL_TRANSPORT_SECURITY_H */ +#endif /* GRPC_CORE_TSI_SSL_TRANSPORT_SECURITY_H */ diff --git a/src/core/lib/tsi/ssl_types.h b/src/core/tsi/ssl_types.h index 0a988effd0..065cb86800 100644 --- a/src/core/lib/tsi/ssl_types.h +++ b/src/core/tsi/ssl_types.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TSI_SSL_TYPES_H -#define GRPC_CORE_LIB_TSI_SSL_TYPES_H +#ifndef GRPC_CORE_TSI_SSL_TYPES_H +#define GRPC_CORE_TSI_SSL_TYPES_H /* A collection of macros to cast between various integer types that are * used differently between BoringSSL and OpenSSL: @@ -52,4 +52,4 @@ #define TSI_SIZE_AS_SIZE(x) ((int)(x)) #endif -#endif /* GRPC_CORE_LIB_TSI_SSL_TYPES_H */ +#endif /* GRPC_CORE_TSI_SSL_TYPES_H */ diff --git a/src/core/lib/tsi/test_creds/BUILD b/src/core/tsi/test_creds/BUILD index 5cf04caf17..5cf04caf17 100644 --- a/src/core/lib/tsi/test_creds/BUILD +++ b/src/core/tsi/test_creds/BUILD diff --git a/src/core/lib/tsi/test_creds/README b/src/core/tsi/test_creds/README index eb8482d648..eb8482d648 100644 --- a/src/core/lib/tsi/test_creds/README +++ b/src/core/tsi/test_creds/README diff --git a/src/core/lib/tsi/test_creds/badclient.key b/src/core/tsi/test_creds/badclient.key index 5832685122..5832685122 100644 --- a/src/core/lib/tsi/test_creds/badclient.key +++ b/src/core/tsi/test_creds/badclient.key diff --git a/src/core/lib/tsi/test_creds/badclient.pem b/src/core/tsi/test_creds/badclient.pem index 1785970221..1785970221 100644 --- a/src/core/lib/tsi/test_creds/badclient.pem +++ b/src/core/tsi/test_creds/badclient.pem diff --git a/src/core/lib/tsi/test_creds/badserver.key b/src/core/tsi/test_creds/badserver.key index abfbde10ff..abfbde10ff 100644 --- a/src/core/lib/tsi/test_creds/badserver.key +++ b/src/core/tsi/test_creds/badserver.key diff --git a/src/core/lib/tsi/test_creds/badserver.pem b/src/core/tsi/test_creds/badserver.pem index 983c979f31..983c979f31 100644 --- a/src/core/lib/tsi/test_creds/badserver.pem +++ b/src/core/tsi/test_creds/badserver.pem diff --git a/src/core/lib/tsi/test_creds/ca-openssl.cnf b/src/core/tsi/test_creds/ca-openssl.cnf index e97b945e4b..e97b945e4b 100644 --- a/src/core/lib/tsi/test_creds/ca-openssl.cnf +++ b/src/core/tsi/test_creds/ca-openssl.cnf diff --git a/src/core/lib/tsi/test_creds/ca.key b/src/core/tsi/test_creds/ca.key index 03c4f950e3..03c4f950e3 100644 --- a/src/core/lib/tsi/test_creds/ca.key +++ b/src/core/tsi/test_creds/ca.key diff --git a/src/core/lib/tsi/test_creds/ca.pem b/src/core/tsi/test_creds/ca.pem index 6c8511a73c..6c8511a73c 100644 --- a/src/core/lib/tsi/test_creds/ca.pem +++ b/src/core/tsi/test_creds/ca.pem diff --git a/src/core/lib/tsi/test_creds/client.key b/src/core/tsi/test_creds/client.key index f48d0735d9..f48d0735d9 100644 --- a/src/core/lib/tsi/test_creds/client.key +++ b/src/core/tsi/test_creds/client.key diff --git a/src/core/lib/tsi/test_creds/client.pem b/src/core/tsi/test_creds/client.pem index e332091019..e332091019 100644 --- a/src/core/lib/tsi/test_creds/client.pem +++ b/src/core/tsi/test_creds/client.pem diff --git a/src/core/lib/tsi/test_creds/server0.key b/src/core/tsi/test_creds/server0.key index add153c9ae..add153c9ae 100644 --- a/src/core/lib/tsi/test_creds/server0.key +++ b/src/core/tsi/test_creds/server0.key diff --git a/src/core/lib/tsi/test_creds/server0.pem b/src/core/tsi/test_creds/server0.pem index ade75d8563..ade75d8563 100644 --- a/src/core/lib/tsi/test_creds/server0.pem +++ b/src/core/tsi/test_creds/server0.pem diff --git a/src/core/lib/tsi/test_creds/server1-openssl.cnf b/src/core/tsi/test_creds/server1-openssl.cnf index 8a02108289..8a02108289 100644 --- a/src/core/lib/tsi/test_creds/server1-openssl.cnf +++ b/src/core/tsi/test_creds/server1-openssl.cnf diff --git a/src/core/lib/tsi/test_creds/server1.key b/src/core/tsi/test_creds/server1.key index 143a5b8765..143a5b8765 100644 --- a/src/core/lib/tsi/test_creds/server1.key +++ b/src/core/tsi/test_creds/server1.key diff --git a/src/core/lib/tsi/test_creds/server1.pem b/src/core/tsi/test_creds/server1.pem index f3d43fcc5b..f3d43fcc5b 100644 --- a/src/core/lib/tsi/test_creds/server1.pem +++ b/src/core/tsi/test_creds/server1.pem diff --git a/src/core/lib/tsi/transport_security.c b/src/core/tsi/transport_security.c index 2cbf381c88..67ebe1b1f3 100644 --- a/src/core/lib/tsi/transport_security.c +++ b/src/core/tsi/transport_security.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/tsi/transport_security.h" +#include "src/core/tsi/transport_security.h" #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> @@ -101,7 +101,7 @@ tsi_result tsi_frame_protector_protect_flush( tsi_frame_protector *self, unsigned char *protected_output_frames, size_t *protected_output_frames_size, size_t *still_pending_size) { if (self == NULL || protected_output_frames == NULL || - protected_output_frames == NULL || still_pending_size == NULL) { + protected_output_frames_size == NULL || still_pending_size == NULL) { return TSI_INVALID_ARGUMENT; } return self->vtable->protect_flush(self, protected_output_frames, diff --git a/src/core/lib/tsi/transport_security.h b/src/core/tsi/transport_security.h index aaf110ee05..491fa1a8bd 100644 --- a/src/core/lib/tsi/transport_security.h +++ b/src/core/tsi/transport_security.h @@ -31,10 +31,10 @@ * */ -#ifndef GRPC_CORE_LIB_TSI_TRANSPORT_SECURITY_H -#define GRPC_CORE_LIB_TSI_TRANSPORT_SECURITY_H +#ifndef GRPC_CORE_TSI_TRANSPORT_SECURITY_H +#define GRPC_CORE_TSI_TRANSPORT_SECURITY_H -#include "src/core/lib/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_interface.h" #ifdef __cplusplus extern "C" { @@ -108,4 +108,4 @@ char *tsi_strdup(const char *src); /* Sadly, no strdup in C89. */ } #endif -#endif /* GRPC_CORE_LIB_TSI_TRANSPORT_SECURITY_H */ +#endif /* GRPC_CORE_TSI_TRANSPORT_SECURITY_H */ diff --git a/src/core/lib/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h index 3e8c9d7ffe..caed43eac4 100644 --- a/src/core/lib/tsi/transport_security_interface.h +++ b/src/core/tsi/transport_security_interface.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TSI_TRANSPORT_SECURITY_INTERFACE_H -#define GRPC_CORE_LIB_TSI_TRANSPORT_SECURITY_INTERFACE_H +#ifndef GRPC_CORE_TSI_TRANSPORT_SECURITY_INTERFACE_H +#define GRPC_CORE_TSI_TRANSPORT_SECURITY_INTERFACE_H #include <stdint.h> #include <stdlib.h> @@ -350,4 +350,4 @@ void tsi_handshaker_destroy(tsi_handshaker *self); } #endif -#endif /* GRPC_CORE_LIB_TSI_TRANSPORT_SECURITY_INTERFACE_H */ +#endif /* GRPC_CORE_TSI_TRANSPORT_SECURITY_INTERFACE_H */ diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index 253614ca9b..a7b3c2c0da 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -69,9 +69,9 @@ void ChannelData::GetInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, // CallData -void CallData::StartTransportStreamOp(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - TransportStreamOp *op) { +void CallData::StartTransportStreamOpBatch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + TransportStreamOpBatch *op) { grpc_call_next_op(exec_ctx, elem, op->op()); } diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 59dffb5007..8d800b87d9 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -42,7 +42,6 @@ #include <vector> #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/metadata_batch.h" @@ -141,13 +140,13 @@ class TransportOp { grpc_transport_op *op_; // Not owned. }; -/// A C++ wrapper for the \c grpc_transport_stream_op struct. -class TransportStreamOp { +/// A C++ wrapper for the \c grpc_transport_stream_op_batch struct. +class TransportStreamOpBatch { public: /// Borrows a pointer to \a op, but does NOT take ownership. /// The caller must ensure that \a op continues to exist for as - /// long as the TransportStreamOp object does. - explicit TransportStreamOp(grpc_transport_stream_op *op) + /// long as the TransportStreamOpBatch object does. + explicit TransportStreamOpBatch(grpc_transport_stream_op_batch *op) : op_(op), send_initial_metadata_( op->send_initial_metadata @@ -166,7 +165,7 @@ class TransportStreamOp { ? op->payload->recv_trailing_metadata.recv_trailing_metadata : nullptr) {} - grpc_transport_stream_op *op() const { return op_; } + grpc_transport_stream_op_batch *op() const { return op_; } grpc_closure *on_complete() const { return op_->on_complete; } void set_on_complete(grpc_closure *closure) { op_->on_complete = closure; } @@ -209,24 +208,12 @@ class TransportStreamOp { op_->payload->send_message.send_message = send_message; } - /// To be called only on clients and servers, respectively. - grpc_client_security_context *client_security_context() const { - return (grpc_client_security_context *)op_->payload - ->context[GRPC_CONTEXT_SECURITY] - .value; - } - grpc_server_security_context *server_security_context() const { - return (grpc_server_security_context *)op_->payload - ->context[GRPC_CONTEXT_SECURITY] - .value; - } - census_context *get_census_context() const { return (census_context *)op_->payload->context[GRPC_CONTEXT_TRACING].value; } private: - grpc_transport_stream_op *op_; // Not owned. + grpc_transport_stream_op_batch *op_; // Not owned. MetadataBatch send_initial_metadata_; MetadataBatch send_trailing_metadata_; MetadataBatch recv_initial_metadata_; @@ -270,9 +257,9 @@ class CallData { // TODO(roth): Find a way to avoid passing elem into these methods. /// Starts a new stream operation. - virtual void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - TransportStreamOp *op); + virtual void StartTransportStreamOpBatch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + TransportStreamOpBatch *op); /// Sets a pollset or pollset set. virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, @@ -342,12 +329,12 @@ class ChannelFilter final { reinterpret_cast<CallDataType *>(elem->call_data)->~CallDataType(); } - static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { + static void StartTransportStreamOpBatch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { CallDataType *call_data = (CallDataType *)elem->call_data; - TransportStreamOp op_wrapper(op); - call_data->StartTransportStreamOp(exec_ctx, elem, &op_wrapper); + TransportStreamOpBatch op_wrapper(op); + call_data->StartTransportStreamOpBatch(exec_ctx, elem, &op_wrapper); } static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, @@ -399,7 +386,7 @@ void RegisterChannelFilter( stack_type, priority, include_filter, - {FilterType::StartTransportStreamOp, FilterType::StartTransportOp, + {FilterType::StartTransportStreamOpBatch, FilterType::StartTransportOp, FilterType::call_data_size, FilterType::InitCallElement, FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement, FilterType::channel_data_size, FilterType::InitChannelElement, diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 0408a41085..14c51f63c5 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -43,7 +43,12 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) { +// 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here +// i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create +// a 'grpc_completion_queue' instance (which is being passed as the input to +// this constructor), one must have already called grpc_init(). +CompletionQueue::CompletionQueue(grpc_completion_queue* take) + : GrpcLibraryCodegen(false), cq_(take) { InitialAvalanching(); } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index e874892e73..ce173a1ee2 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -489,7 +489,9 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) { int Server::AddListeningPort(const grpc::string& addr, ServerCredentials* creds) { GPR_ASSERT(!started_); - return creds->AddPortToServer(addr, server_); + int port = creds->AddPortToServer(addr, server_); + global_callbacks_->AddPort(this, port); + return port; } bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 05c05c8695..3a408eb23e 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -42,6 +42,7 @@ #include <grpc++/support/time.h> #include <grpc/compression.h> #include <grpc/grpc.h> +#include <grpc/load_reporting.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> diff --git a/src/csharp/doc/grpc_csharp_public.shfbproj b/src/csharp/doc/grpc_csharp_public.shfbproj index d9b9749819..fab953da35 100644 --- a/src/csharp/doc/grpc_csharp_public.shfbproj +++ b/src/csharp/doc/grpc_csharp_public.shfbproj @@ -18,8 +18,10 @@ <Language>en-US</Language> <DocumentationSources> <DocumentationSource sourceFile="..\Grpc.Auth\Grpc.Auth.csproj" /> - <DocumentationSource sourceFile="..\Grpc.Core\Grpc.Core.csproj" /> - </DocumentationSources> +<DocumentationSource sourceFile="..\Grpc.Core\Grpc.Core.csproj" /> +<DocumentationSource sourceFile="..\Grpc.HealthCheck\Grpc.HealthCheck.csproj" /> +<DocumentationSource sourceFile="..\Grpc.Reflection\Grpc.Reflection.csproj" /> +<DocumentationSource sourceFile="..\Grpc.Core.Testing\Grpc.Core.Testing.csproj" /></DocumentationSources> <BuildAssemblerVerbosity>OnlyWarningsAndErrors</BuildAssemblerVerbosity> <HelpFileFormat>Website</HelpFileFormat> <IndentHtml>False</IndentHtml> @@ -40,12 +42,13 @@ <HtmlHelpName>Documentation</HtmlHelpName> <NamespaceSummaries> <NamespaceSummaryItem name="Grpc.Auth" isDocumented="True">Provides OAuth2 based authentication for gRPC. <c>Grpc.Auth</c> currently consists of a set of very lightweight wrappers and uses C# <a href="https://www.nuget.org/packages/Google.Apis.Auth/">Google.Apis.Auth</a> library.</NamespaceSummaryItem> -<NamespaceSummaryItem name="Grpc.Core" isDocumented="True">Main namespace for gRPC C# functionality. Contains concepts representing both client side and server side gRPC logic. + <NamespaceSummaryItem name="Grpc.Core" isDocumented="True">Main namespace for gRPC C# functionality. Contains concepts representing both client side and server side gRPC logic. <seealso cref="Grpc.Core.Channel"/> <seealso cref="Grpc.Core.Server"/></NamespaceSummaryItem> -<NamespaceSummaryItem name="Grpc.Core.Logging" isDocumented="True">Provides functionality to redirect gRPC logs to application-specified destination.</NamespaceSummaryItem> -<NamespaceSummaryItem name="Grpc.Core.Utils" isDocumented="True">Various utilities for gRPC C#.</NamespaceSummaryItem></NamespaceSummaries> + <NamespaceSummaryItem name="Grpc.Core.Logging" isDocumented="True">Provides functionality to redirect gRPC logs to application-specified destination.</NamespaceSummaryItem> + <NamespaceSummaryItem name="Grpc.Core.Utils" isDocumented="True">Various utilities for gRPC C#.</NamespaceSummaryItem> + </NamespaceSummaries> <MissingTags>Summary, Parameter, AutoDocumentCtors, Namespace, TypeParameter, AutoDocumentDispose</MissingTags> </PropertyGroup> <!-- There are no properties for these groups. AnyCPU needs to appear in order for Visual Studio to perform diff --git a/src/objective-c/tests/build_example_test.sh b/src/objective-c/tests/build_example_test.sh index ae75941ec6..5c50e83110 100755 --- a/src/objective-c/tests/build_example_test.sh +++ b/src/objective-c/tests/build_example_test.sh @@ -35,29 +35,38 @@ set -evo pipefail cd `dirname $0` +trap 'echo "EXIT TIME: $(date)"' EXIT + +echo "TIME: $(date)" SCHEME=HelloWorld \ EXAMPLE_PATH=examples/objective-c/helloworld \ ./build_one_example.sh +echo "TIME: $(date)" SCHEME=RouteGuideClient \ EXAMPLE_PATH=examples/objective-c/route_guide \ ./build_one_example.sh +echo "TIME: $(date)" SCHEME=AuthSample \ EXAMPLE_PATH=examples/objective-c/auth_sample \ ./build_one_example.sh rm -f ../examples/RemoteTestClient/*.{h,m} +echo "TIME: $(date)" SCHEME=Sample \ EXAMPLE_PATH=src/objective-c/examples/Sample \ ./build_one_example.sh +echo "TIME: $(date)" SCHEME=Sample \ EXAMPLE_PATH=src/objective-c/examples/Sample \ FRAMEWORKS=YES \ ./build_one_example.sh +echo "TIME: $(date)" SCHEME=SwiftSample \ EXAMPLE_PATH=src/objective-c/examples/SwiftSample \ ./build_one_example.sh + diff --git a/src/objective-c/tests/build_tests.sh b/src/objective-c/tests/build_tests.sh index bc5bc04494..6602d510d9 100755 --- a/src/objective-c/tests/build_tests.sh +++ b/src/objective-c/tests/build_tests.sh @@ -50,4 +50,5 @@ rm -rf Tests.xcworkspace rm -f Podfile.lock rm -f RemoteTestClient/*.{h,m} +echo "TIME: $(date)" pod install diff --git a/src/objective-c/tests/run_tests.sh b/src/objective-c/tests/run_tests.sh index d217f1c8e0..bd7c2945a2 100755 --- a/src/objective-c/tests/run_tests.sh +++ b/src/objective-c/tests/run_tests.sh @@ -47,31 +47,35 @@ BINDIR=../../../bins/$CONFIG $BINDIR/interop_server --port=5050 --max_send_message_size=8388608 & $BINDIR/interop_server --port=5051 --max_send_message_size=8388608 --use_tls & # Kill them when this script exits. -trap 'kill -9 `jobs -p`' EXIT +trap 'kill -9 `jobs -p` ; echo "EXIT TIME: $(date)"' EXIT # xcodebuild is very verbose. We filter its output and tell Bash to fail if any # element of the pipe fails. # TODO(jcanizales): Use xctool instead? Issue #2540. set -o pipefail XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail)' +echo "TIME: $(date)" xcodebuild \ -workspace Tests.xcworkspace \ -scheme AllTests \ -destination name="iPhone 6" \ test | xcpretty +echo "TIME: $(date)" xcodebuild \ -workspace Tests.xcworkspace \ -scheme CoreCronetEnd2EndTests \ -destination name="iPhone 6" \ test | xcpretty +echo "TIME: $(date)" xcodebuild \ -workspace Tests.xcworkspace \ -scheme CronetUnitTests \ -destination name="iPhone 6" \ test | xcpretty +echo "TIME: $(date)" xcodebuild \ -workspace Tests.xcworkspace \ -scheme InteropTestsRemoteWithCronet \ diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 8f0d25c2c9..acee86678d 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -113,6 +113,9 @@ message ClientConfig { string other_client_api = 15; repeated ChannelArg channel_args = 16; + + // Number of messages on a stream before it gets finished/restarted + int32 messages_per_stream = 18; } message ClientStatus { ClientStats stats = 1; } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 258c65c08e..ed8793b019 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -88,6 +88,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/handshaker_registry.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', + 'src/core/lib/channel/max_age_filter.c', 'src/core/lib/channel/message_size_filter.c', 'src/core/lib/compression/compression.c', 'src/core/lib/compression/message_compress.c', @@ -162,7 +163,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/json/json_reader.c', 'src/core/lib/json/json_string.c', 'src/core/lib/json/json_writer.c', - 'src/core/lib/security/util/b64.c', + 'src/core/lib/slice/b64.c', 'src/core/lib/slice/percent_encoding.c', 'src/core/lib/slice/slice.c', 'src/core/lib/slice/slice_buffer.c', @@ -247,9 +248,9 @@ CORE_SOURCE_FILES = [ 'src/core/lib/security/transport/tsi_error.c', 'src/core/lib/security/util/json_util.c', 'src/core/lib/surface/init_secure.c', - 'src/core/lib/tsi/fake_transport_security.c', - 'src/core/lib/tsi/ssl_transport_security.c', - 'src/core/lib/tsi/transport_security.c', + 'src/core/tsi/fake_transport_security.c', + 'src/core/tsi/ssl_transport_security.c', + 'src/core/tsi/transport_security.c', 'src/core/ext/transport/chttp2/server/chttp2_server.c', 'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c', 'src/core/ext/client_channel/channel_connectivity.c', |