From e0341792d5a33469bbac481e9737bd00e3cb8900 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 Mar 2017 15:59:16 -0800 Subject: Remove most usage of the grpc_call lock --- src/core/lib/surface/call.c | 77 +++++++++++++++++---------------------------- 1 file changed, 29 insertions(+), 48 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c2547c5147..9b01c46bd5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -143,9 +143,10 @@ struct grpc_call { grpc_channel *channel; grpc_call *parent; grpc_call *first_child; + gpr_atm has_children; gpr_timespec start_time; - /* TODO(ctiller): share with cq if possible? */ - gpr_mu mu; + /* protects first_child, setting has_children, and child next/prev links */ + gpr_mu child_list_mu; /* client or server call */ bool is_client; @@ -161,7 +162,7 @@ struct grpc_call { bool receiving_message; bool requested_final_op; bool received_final_op; - bool sent_any_op; + gpr_atm num_ops_sent; /* have we received initial metadata */ bool has_initial_md_been_received; @@ -275,7 +276,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("grpc_call_create", 0); call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size); *out_call = call; - gpr_mu_init(&call->mu); + gpr_mu_init(&call->child_list_mu); call->channel = args->channel; call->cq = args->cq; call->parent = args->parent_call; @@ -313,7 +314,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(call->is_client); GPR_ASSERT(!args->parent_call->is_client); - gpr_mu_lock(&args->parent_call->mu); + gpr_mu_lock(&args->parent_call->child_list_mu); + gpr_atm_no_barrier_store(&args->parent_call->has_children, 1); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -353,7 +355,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, call; } - gpr_mu_unlock(&args->parent_call->mu); + gpr_mu_unlock(&args->parent_call->child_list_mu); } call->send_deadline = send_deadline; @@ -435,7 +437,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - gpr_mu_destroy(&c->mu); + gpr_mu_destroy(&c->child_list_mu); for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } @@ -473,7 +475,7 @@ void grpc_call_destroy(grpc_call *c) { GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); if (parent) { - gpr_mu_lock(&parent->mu); + gpr_mu_lock(&parent->child_list_mu); if (c == parent->first_child) { parent->first_child = c->sibling_next; if (c == parent->first_child) { @@ -482,15 +484,13 @@ void grpc_call_destroy(grpc_call *c) { c->sibling_prev->sibling_next = c->sibling_next; c->sibling_next->sibling_prev = c->sibling_prev; } - gpr_mu_unlock(&parent->mu); + gpr_mu_unlock(&parent->child_list_mu); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); } - gpr_mu_lock(&c->mu); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = c->sent_any_op && !c->received_final_op; - gpr_mu_unlock(&c->mu); + cancel = gpr_atm_no_barrier_load(&c->num_ops_sent) && !c->received_final_op; if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -555,10 +555,8 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); - gpr_mu_lock(&c->mu); cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status, description); - gpr_mu_unlock(&c->mu); grpc_exec_ctx_finish(&exec_ctx); return GRPC_CALL_OK; } @@ -715,9 +713,7 @@ static void set_incoming_compression_algorithm( grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_call *call) { grpc_compression_algorithm algorithm; - gpr_mu_lock(&call->mu); algorithm = call->incoming_compression_algorithm; - gpr_mu_unlock(&call->mu); return algorithm; } @@ -729,9 +725,7 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked( uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { uint32_t flags; - gpr_mu_lock(&call->mu); flags = call->test_only_last_message_flags; - gpr_mu_unlock(&call->mu); return flags; } @@ -785,9 +779,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { uint32_t encodings_accepted_by_peer; - gpr_mu_lock(&call->mu); encodings_accepted_by_peer = call->encodings_accepted_by_peer; - gpr_mu_unlock(&call->mu); return encodings_accepted_by_peer; } @@ -1083,8 +1075,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); - gpr_mu_lock(&call->mu); - if (bctl->send_initial_metadata) { grpc_metadata_batch_destroy( exec_ctx, @@ -1105,17 +1095,21 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, call->received_final_op = true; /* propagate cancellation to any interested children */ - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); + if (gpr_atm_no_barrier_load(&call->has_children)) { + gpr_mu_lock(&call->child_list_mu); + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call, NULL); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } + gpr_mu_unlock(&call->child_list_mu); } if (call->is_client) { @@ -1130,7 +1124,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } - gpr_mu_unlock(&call->mu); if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ @@ -1221,7 +1214,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - gpr_mu_lock(&bctl->call->mu); if (error != GRPC_ERROR_NONE) { if (call->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); @@ -1233,11 +1225,9 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { - gpr_mu_unlock(&bctl->call->mu); process_data_after_md(exec_ctx, bctlp); } else { call->saved_receiving_stream_ready_bctlp = bctlp; - gpr_mu_unlock(&bctl->call->mu); } } @@ -1309,8 +1299,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, batch_control *bctl = bctlp; grpc_call *call = bctl->call; - gpr_mu_lock(&call->mu); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = @@ -1336,11 +1324,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_closure_sched(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); + grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } - gpr_mu_unlock(&call->mu); - finish_batch_step(exec_ctx, bctl); } @@ -1393,7 +1379,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); - gpr_mu_lock(&call->mu); grpc_transport_stream_op *stream_op = &bctl->op; memset(stream_op, 0, sizeof(*stream_op)); stream_op->covered_by_poller = true; @@ -1679,8 +1664,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; - call->sent_any_op = true; - gpr_mu_unlock(&call->mu); + gpr_atm_no_barrier_fetch_add(&call->num_ops_sent, 1); execute_op(exec_ctx, call, stream_op); @@ -1711,7 +1695,6 @@ done_with_error: if (bctl->recv_final_op) { call->requested_final_op = 0; } - gpr_mu_unlock(&call->mu); goto done; } @@ -1760,10 +1743,8 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; } grpc_compression_algorithm grpc_call_compression_for_level( grpc_call *call, grpc_compression_level level) { - gpr_mu_lock(&call->mu); grpc_compression_algorithm algo = compression_algorithm_for_level_locked(call, level); - gpr_mu_unlock(&call->mu); return algo; } -- cgit v1.2.3 From 676db291d2387603f795dd6bd5853188e0ff7f63 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 Mar 2017 16:10:56 -0800 Subject: Fix barriers --- src/core/lib/surface/call.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9b01c46bd5..2f3ddd6bcb 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -315,7 +315,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->parent_call->is_client); gpr_mu_lock(&args->parent_call->child_list_mu); - gpr_atm_no_barrier_store(&args->parent_call->has_children, 1); + gpr_atm_rel_store(&args->parent_call->has_children, 1); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -1095,7 +1095,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, call->received_final_op = true; /* propagate cancellation to any interested children */ - if (gpr_atm_no_barrier_load(&call->has_children)) { + if (gpr_atm_acq_load(&call->has_children)) { gpr_mu_lock(&call->child_list_mu); child_call = call->first_child; if (child_call != NULL) { -- cgit v1.2.3 From b597dcf53a5057425ba24a7c8dbed2f65f8c31f4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Mar 2017 07:02:11 -0800 Subject: Race fixes --- src/core/lib/surface/call.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2f3ddd6bcb..eafeef0a20 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -161,9 +161,9 @@ struct grpc_call { bool received_initial_metadata; bool receiving_message; bool requested_final_op; - bool received_final_op; - gpr_atm num_ops_sent; - + gpr_atm any_ops_sent_atm; + gpr_atm received_final_op_atm; + /* have we received initial metadata */ bool has_initial_md_been_received; @@ -458,7 +458,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { GRPC_ERROR_UNREF( - unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error); + unpack_received_status(gpr_atm_acq_load(&c->status[i])).error); } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); @@ -490,7 +490,7 @@ void grpc_call_destroy(grpc_call *c) { GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = gpr_atm_no_barrier_load(&c->num_ops_sent) && !c->received_final_op; + cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && !gpr_atm_acq_load(&c->received_final_op_atm); if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -1048,7 +1048,7 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, } static grpc_error *consolidate_batch_errors(batch_control *bctl) { - size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors); + size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors); if (n == 0) { return GRPC_ERROR_NONE; } else if (n == 1) { @@ -1093,7 +1093,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); - call->received_final_op = true; + gpr_atm_rel_store(&call->received_final_op_atm, 1); /* propagate cancellation to any interested children */ if (gpr_atm_acq_load(&call->has_children)) { gpr_mu_lock(&call->child_list_mu); @@ -1286,7 +1286,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_error *error, bool has_cancelled) { if (error == GRPC_ERROR_NONE) return; - int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); + int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1); if (idx == 0 && !has_cancelled) { cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); @@ -1664,7 +1664,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; - gpr_atm_no_barrier_fetch_add(&call->num_ops_sent, 1); + gpr_atm_rel_store(&call->any_ops_sent_atm, 1); execute_op(exec_ctx, call, stream_op); -- cgit v1.2.3 From 123c72bc3c7594e31620e603dce4322c4d7c57b1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Mar 2017 07:33:27 -0800 Subject: Handle a race between cancellation of a parent call and adding a child --- src/core/lib/surface/call.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index eafeef0a20..01efcafceb 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -163,7 +163,7 @@ struct grpc_call { bool requested_final_op; gpr_atm any_ops_sent_atm; gpr_atm received_final_op_atm; - + /* have we received initial metadata */ bool has_initial_md_been_received; @@ -343,6 +343,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; + if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + } } if (args->parent_call->first_child == NULL) { @@ -490,7 +494,8 @@ void grpc_call_destroy(grpc_call *c) { GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && !gpr_atm_acq_load(&c->received_final_op_atm); + cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && + !gpr_atm_acq_load(&c->received_final_op_atm); if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -1103,7 +1108,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, next_child_call = child_call->sibling_next; if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); } child_call = next_child_call; -- cgit v1.2.3 From c5b90df9b2738076acf6a6972d833489f6470335 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Mar 2017 16:11:08 -0800 Subject: debug --- src/core/lib/surface/call.c | 74 ++++++++++++-------------------------- src/core/lib/transport/transport.c | 3 +- 2 files changed, 24 insertions(+), 53 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 01efcafceb..8b2f9fd3f5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -143,9 +143,8 @@ struct grpc_call { grpc_channel *channel; grpc_call *parent; grpc_call *first_child; - gpr_atm has_children; gpr_timespec start_time; - /* protects first_child, setting has_children, and child next/prev links */ + /* protects first_child, and child next/prev links */ gpr_mu child_list_mu; /* client or server call */ @@ -315,7 +314,6 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->parent_call->is_client); gpr_mu_lock(&args->parent_call->child_list_mu); - gpr_atm_rel_store(&args->parent_call->has_children, 1); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -566,45 +564,19 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return GRPC_CALL_OK; } -typedef struct termination_closure { - grpc_closure closure; - grpc_call *call; - grpc_transport_stream_op op; -} termination_closure; - -static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, - grpc_error *error) { - termination_closure *tc = tcp; - GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "termination"); - gpr_free(tc); -} - -static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, +static void done_termination(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { - termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = GRPC_ERROR_REF(error); - /* reuse closure to catch completion */ - tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc, - grpc_schedule_on_exec_ctx); - execute_op(exec_ctx, tc->call, &tc->op); -} - -static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error) { - termination_closure *tc = gpr_malloc(sizeof(*tc)); - memset(tc, 0, sizeof(*tc)); - tc->call = c; - GRPC_CALL_INTERNAL_REF(tc->call, "termination"); - grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination, - tc, grpc_schedule_on_exec_ctx), - error); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination"); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { + GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - terminate_with_error(exec_ctx, c, error); + grpc_transport_stream_op *op = grpc_make_transport_stream_op( + grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); + op->cancel_error = error; + execute_op(exec_ctx, c, op); } static grpc_error *error_from_status(grpc_status_code status, @@ -1100,23 +1072,21 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, gpr_atm_rel_store(&call->received_final_op_atm, 1); /* propagate cancellation to any interested children */ - if (gpr_atm_acq_load(&call->has_children)) { - gpr_mu_lock(&call->child_list_mu); - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); - } - gpr_mu_unlock(&call->child_list_mu); + gpr_mu_lock(&call->child_list_mu); + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); + } + child_call = next_child_call; + } while (child_call != call->first_child); } + gpr_mu_unlock(&call->child_list_mu); if (call->is_client) { get_final_status(call, set_status_value_directly, diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 165950e288..3024f2ae78 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -254,8 +254,9 @@ typedef struct { static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { made_transport_stream_op *op = arg; - grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); + grpc_closure *c = op->inner_on_complete; gpr_free(op); + grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error)); } grpc_transport_stream_op *grpc_make_transport_stream_op( -- cgit v1.2.3 From 9202b3fdfdaf890752d7472692296490ec55c750 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 12 Mar 2017 22:30:38 -0700 Subject: Arena allocator for grpc --- CMakeLists.txt | 69 ++++++++ Makefile | 85 +++++++++ binding.gyp | 1 + build.yaml | 30 ++++ config.m4 | 1 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + package.xml | 2 + src/core/lib/support/arena.c | 90 ++++++++++ src/core/lib/support/arena.h | 54 ++++++ src/python/grpcio/grpc_core_dependencies.py | 1 + test/core/support/arena_test.c | 91 ++++++++++ test/cpp/microbenchmarks/bm_arena.cc | 69 ++++++++ tools/doxygen/Doxyfile.core.internal | 2 + tools/run_tests/generated/sources_and_headers.json | 39 +++++ tools/run_tests/generated/tests.json | 44 +++++ vsprojects/buildtests_c.sln | 25 +++ vsprojects/vcxproj/gpr/gpr.vcxproj | 3 + vsprojects/vcxproj/gpr/gpr.vcxproj.filters | 6 + .../vcxproj/test/arena_test/arena_test.vcxproj | 193 +++++++++++++++++++++ .../test/arena_test/arena_test.vcxproj.filters | 21 +++ 21 files changed, 831 insertions(+) create mode 100644 src/core/lib/support/arena.c create mode 100644 src/core/lib/support/arena.h create mode 100644 test/core/support/arena_test.c create mode 100644 test/cpp/microbenchmarks/bm_arena.cc create mode 100644 vsprojects/vcxproj/test/arena_test/arena_test.vcxproj create mode 100644 vsprojects/vcxproj/test/arena_test/arena_test.vcxproj.filters (limited to 'src/core/lib') diff --git a/CMakeLists.txt b/CMakeLists.txt index d0a65b4493..faa7d528e8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -332,6 +332,7 @@ add_dependencies(buildtests_c alarm_test) add_dependencies(buildtests_c algorithm_test) add_dependencies(buildtests_c alloc_test) add_dependencies(buildtests_c alpn_test) +add_dependencies(buildtests_c arena_test) add_dependencies(buildtests_c bad_server_response_test) add_dependencies(buildtests_c bdp_estimator_test) add_dependencies(buildtests_c bin_decoder_test) @@ -574,6 +575,9 @@ add_dependencies(buildtests_cxx alarm_cpp_test) add_dependencies(buildtests_cxx async_end2end_test) add_dependencies(buildtests_cxx auth_property_iterator_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) +add_dependencies(buildtests_cxx bm_arena) +endif() +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx bm_call_create) endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -690,6 +694,7 @@ add_library(gpr src/core/lib/profiling/basic_timers.c src/core/lib/profiling/stap_timers.c src/core/lib/support/alloc.c + src/core/lib/support/arena.c src/core/lib/support/avl.c src/core/lib/support/backoff.c src/core/lib/support/cmdline.c @@ -4112,6 +4117,31 @@ target_link_libraries(alpn_test endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(arena_test + test/core/support/arena_test.c +) + + +target_include_directories(arena_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include +) + +target_link_libraries(arena_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gpr_test_util + gpr +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(bad_server_response_test test/core/end2end/bad_server_response_test.c ) @@ -7626,6 +7656,45 @@ endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) +add_executable(bm_arena + test/cpp/microbenchmarks/bm_arena.cc + third_party/googletest/src/gtest-all.cc +) + + +target_include_directories(bm_arena + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include + PRIVATE third_party/googletest/include + PRIVATE third_party/googletest + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(bm_arena + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_benchmark + benchmark + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr_test_util + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + +endif() +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_executable(bm_call_create test/cpp/microbenchmarks/bm_call_create.cc third_party/googletest/src/gtest-all.cc diff --git a/Makefile b/Makefile index a9242dddbd..c6203768ad 100644 --- a/Makefile +++ b/Makefile @@ -906,6 +906,7 @@ algorithm_test: $(BINDIR)/$(CONFIG)/algorithm_test alloc_test: $(BINDIR)/$(CONFIG)/alloc_test alpn_test: $(BINDIR)/$(CONFIG)/alpn_test api_fuzzer: $(BINDIR)/$(CONFIG)/api_fuzzer +arena_test: $(BINDIR)/$(CONFIG)/arena_test bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test bdp_estimator_test: $(BINDIR)/$(CONFIG)/bdp_estimator_test bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test @@ -1047,6 +1048,7 @@ wakeup_fd_cv_test: $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test +bm_arena: $(BINDIR)/$(CONFIG)/bm_arena bm_call_create: $(BINDIR)/$(CONFIG)/bm_call_create bm_chttp2_hpack: $(BINDIR)/$(CONFIG)/bm_chttp2_hpack bm_closure: $(BINDIR)/$(CONFIG)/bm_closure @@ -1285,6 +1287,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/algorithm_test \ $(BINDIR)/$(CONFIG)/alloc_test \ $(BINDIR)/$(CONFIG)/alpn_test \ + $(BINDIR)/$(CONFIG)/arena_test \ $(BINDIR)/$(CONFIG)/bad_server_response_test \ $(BINDIR)/$(CONFIG)/bdp_estimator_test \ $(BINDIR)/$(CONFIG)/bin_decoder_test \ @@ -1468,6 +1471,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/alarm_cpp_test \ $(BINDIR)/$(CONFIG)/async_end2end_test \ $(BINDIR)/$(CONFIG)/auth_property_iterator_test \ + $(BINDIR)/$(CONFIG)/bm_arena \ $(BINDIR)/$(CONFIG)/bm_call_create \ $(BINDIR)/$(CONFIG)/bm_chttp2_hpack \ $(BINDIR)/$(CONFIG)/bm_closure \ @@ -1583,6 +1587,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/alarm_cpp_test \ $(BINDIR)/$(CONFIG)/async_end2end_test \ $(BINDIR)/$(CONFIG)/auth_property_iterator_test \ + $(BINDIR)/$(CONFIG)/bm_arena \ $(BINDIR)/$(CONFIG)/bm_call_create \ $(BINDIR)/$(CONFIG)/bm_chttp2_hpack \ $(BINDIR)/$(CONFIG)/bm_closure \ @@ -1663,6 +1668,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/alloc_test || ( echo test alloc_test failed ; exit 1 ) $(E) "[RUN] Testing alpn_test" $(Q) $(BINDIR)/$(CONFIG)/alpn_test || ( echo test alpn_test failed ; exit 1 ) + $(E) "[RUN] Testing arena_test" + $(Q) $(BINDIR)/$(CONFIG)/arena_test || ( echo test arena_test failed ; exit 1 ) $(E) "[RUN] Testing bad_server_response_test" $(Q) $(BINDIR)/$(CONFIG)/bad_server_response_test || ( echo test bad_server_response_test failed ; exit 1 ) $(E) "[RUN] Testing bdp_estimator_test" @@ -1923,6 +1930,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/async_end2end_test || ( echo test async_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing auth_property_iterator_test" $(Q) $(BINDIR)/$(CONFIG)/auth_property_iterator_test || ( echo test auth_property_iterator_test failed ; exit 1 ) + $(E) "[RUN] Testing bm_arena" + $(Q) $(BINDIR)/$(CONFIG)/bm_arena || ( echo test bm_arena failed ; exit 1 ) $(E) "[RUN] Testing bm_call_create" $(Q) $(BINDIR)/$(CONFIG)/bm_call_create || ( echo test bm_call_create failed ; exit 1 ) $(E) "[RUN] Testing bm_chttp2_hpack" @@ -2594,6 +2603,7 @@ LIBGPR_SRC = \ src/core/lib/profiling/basic_timers.c \ src/core/lib/profiling/stap_timers.c \ src/core/lib/support/alloc.c \ + src/core/lib/support/arena.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/cmdline.c \ @@ -8094,6 +8104,38 @@ endif endif +ARENA_TEST_SRC = \ + test/core/support/arena_test.c \ + +ARENA_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(ARENA_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/arena_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/arena_test: $(ARENA_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(ARENA_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/arena_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/support/arena_test.o: $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_arena_test: $(ARENA_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(ARENA_TEST_OBJS:.o=.dep) +endif +endif + + BAD_SERVER_RESPONSE_TEST_SRC = \ test/core/end2end/bad_server_response_test.c \ @@ -12639,6 +12681,49 @@ endif endif +BM_ARENA_SRC = \ + test/cpp/microbenchmarks/bm_arena.cc \ + +BM_ARENA_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BM_ARENA_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/bm_arena: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/bm_arena: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/bm_arena: $(PROTOBUF_DEP) $(BM_ARENA_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(BM_ARENA_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/bm_arena + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/microbenchmarks/bm_arena.o: $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_bm_arena: $(BM_ARENA_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(BM_ARENA_OBJS:.o=.dep) +endif +endif + + BM_CALL_CREATE_SRC = \ test/cpp/microbenchmarks/bm_call_create.cc \ diff --git a/binding.gyp b/binding.gyp index c521a27c30..59b6b600e9 100644 --- a/binding.gyp +++ b/binding.gyp @@ -544,6 +544,7 @@ 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/arena.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/build.yaml b/build.yaml index a32a8a06d3..e799928c04 100644 --- a/build.yaml +++ b/build.yaml @@ -85,6 +85,7 @@ filegroups: - include/grpc/support/useful.h headers: - src/core/lib/profiling/timers.h + - src/core/lib/support/arena.h - src/core/lib/support/backoff.h - src/core/lib/support/block_annotate.h - src/core/lib/support/env.h @@ -101,6 +102,7 @@ filegroups: - src/core/lib/profiling/basic_timers.c - src/core/lib/profiling/stap_timers.c - src/core/lib/support/alloc.c + - src/core/lib/support/arena.c - src/core/lib/support/avl.c - src/core/lib/support/backoff.c - src/core/lib/support/cmdline.c @@ -1483,6 +1485,14 @@ targets: - test/core/end2end/fuzzers/api_fuzzer_corpus dict: test/core/end2end/fuzzers/api_fuzzer.dictionary maxlen: 2048 +- name: arena_test + build: test + language: c + src: + - test/core/support/arena_test.c + deps: + - gpr_test_util + - gpr - name: bad_server_response_test build: test language: c @@ -3052,6 +3062,26 @@ targets: - grpc - gpr_test_util - gpr +- name: bm_arena + build: test + language: c++ + src: + - test/cpp/microbenchmarks/bm_arena.cc + deps: + - grpc_benchmark + - benchmark + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr + args: + - --benchmark_min_time=0 + platforms: + - mac + - linux + - posix - name: bm_call_create build: test language: c++ diff --git a/config.m4 b/config.m4 index 90536e503e..f43a031638 100644 --- a/config.m4 +++ b/config.m4 @@ -39,6 +39,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/profiling/basic_timers.c \ src/core/lib/profiling/stap_timers.c \ src/core/lib/support/alloc.c \ + src/core/lib/support/arena.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/cmdline.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 027babcda4..8a5dcbb99c 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -196,6 +196,7 @@ Pod::Spec.new do |s| # To save you from scrolling, this is the last part of the podspec. ss.source_files = 'src/core/lib/profiling/timers.h', + 'src/core/lib/support/arena.h', 'src/core/lib/support/backoff.h', 'src/core/lib/support/block_annotate.h', 'src/core/lib/support/env.h', @@ -211,6 +212,7 @@ Pod::Spec.new do |s| 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/arena.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', @@ -675,6 +677,7 @@ Pod::Spec.new do |s| 'src/core/plugin_registry/grpc_plugin_registry.c' ss.private_header_files = 'src/core/lib/profiling/timers.h', + 'src/core/lib/support/arena.h', 'src/core/lib/support/backoff.h', 'src/core/lib/support/block_annotate.h', 'src/core/lib/support/env.h', diff --git a/grpc.gemspec b/grpc.gemspec index 8d5b7b2ab1..825c23d7a8 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -82,6 +82,7 @@ Gem::Specification.new do |s| s.files += %w( include/grpc/impl/codegen/sync_posix.h ) s.files += %w( include/grpc/impl/codegen/sync_windows.h ) s.files += %w( src/core/lib/profiling/timers.h ) + s.files += %w( src/core/lib/support/arena.h ) s.files += %w( src/core/lib/support/backoff.h ) s.files += %w( src/core/lib/support/block_annotate.h ) s.files += %w( src/core/lib/support/env.h ) @@ -97,6 +98,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/profiling/basic_timers.c ) s.files += %w( src/core/lib/profiling/stap_timers.c ) s.files += %w( src/core/lib/support/alloc.c ) + s.files += %w( src/core/lib/support/arena.c ) s.files += %w( src/core/lib/support/avl.c ) s.files += %w( src/core/lib/support/backoff.c ) s.files += %w( src/core/lib/support/cmdline.c ) diff --git a/package.xml b/package.xml index 4167bef26e..67669286c3 100644 --- a/package.xml +++ b/package.xml @@ -91,6 +91,7 @@ + @@ -106,6 +107,7 @@ + diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c new file mode 100644 index 0000000000..a5b0be4d48 --- /dev/null +++ b/src/core/lib/support/arena.c @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/support/arena.h" +#include +#include +#include + +typedef struct zone { + size_t size_begin; + size_t size_end; + gpr_atm next_atm; +} zone; + +struct gpr_arena { + gpr_atm size_so_far; + zone initial_zone; +}; + +gpr_arena *gpr_arena_create(size_t initial_size) { + gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); + a->initial_zone.size_end = initial_size; + return a; +} + +size_t gpr_arena_destroy(gpr_arena *arena) { + gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far); + zone *z = (zone *)gpr_atm_no_barrier_load(&arena->initial_zone.next_atm); + gpr_free(arena); + while (z) { + zone *next_z = (zone *)gpr_atm_no_barrier_load(&z->next_atm); + gpr_free(z); + z = next_z; + } + return (size_t)size; +} + +void *gpr_arena_alloc(gpr_arena *arena, size_t size) { + size_t start = + (size_t)gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size); + zone *z = &arena->initial_zone; + while (start > z->size_begin) { + zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm); + while (next_z == NULL) { + size_t next_z_size = GPR_MAX(2 * start, size); + next_z = gpr_zalloc(sizeof(zone) + next_z_size); + next_z->size_begin = z->size_end; + next_z->size_end = z->size_end + next_z_size; + if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) { + gpr_free(next_z); + next_z = NULL; + } + } + z = next_z; + } + if (start + size > z->size_end) { + return gpr_arena_alloc(arena, size); + } + return ((char *)(z + 1)) + start - z->size_begin; +} diff --git a/src/core/lib/support/arena.h b/src/core/lib/support/arena.h new file mode 100644 index 0000000000..c28033ffc3 --- /dev/null +++ b/src/core/lib/support/arena.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// \file Arena based allocator +// Allows very fast allocation of memory, but that memory cannot be freed until +// the arena as a whole is freed +// Tracks the total memory allocated against it, so that future arenas can +// pre-allocate the right amount of memory + +#ifndef GRPC_CORE_LIB_SUPPORT_ARENA_H +#define GRPC_CORE_LIB_SUPPORT_ARENA_H + +#include + +typedef struct gpr_arena gpr_arena; + +// Create an arena, with \a initial_size bytes in the first allocated buffer +gpr_arena *gpr_arena_create(size_t initial_size); +// Allocate \a size bytes from the arena +void *gpr_arena_alloc(gpr_arena *arena, size_t size); +// Destroy an arena, returning the total number of bytes allocated +size_t gpr_arena_destroy(gpr_arena *arena); + +#endif /* GRPC_CORE_LIB_SUPPORT_ARENA_H */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a9f20e6d2a..52d3ffd2c4 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -33,6 +33,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/arena.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/test/core/support/arena_test.c b/test/core/support/arena_test.c new file mode 100644 index 0000000000..bbf6d48db3 --- /dev/null +++ b/test/core/support/arena_test.c @@ -0,0 +1,91 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/support/arena.h" + +#include +#include +#include +#include +#include + +#include "src/core/lib/support/string.h" +#include "test/core/util/test_config.h" + +static void test_noop(void) { gpr_arena_destroy(gpr_arena_create(1)); } + +static void test(const char *name, size_t init_size, const size_t *allocs, + size_t nallocs) { + gpr_strvec v; + char *s; + gpr_strvec_init(&v); + gpr_asprintf(&s, "test '%s': %" PRIdPTR " <- {", name, init_size); + gpr_strvec_add(&v, s); + for (size_t i = 0; i < nallocs; i++) { + gpr_asprintf(&s, "%" PRIdPTR ",", allocs[i]); + gpr_strvec_add(&v, s); + } + gpr_strvec_add(&v, gpr_strdup("}")); + s = gpr_strvec_flatten(&v, NULL); + gpr_strvec_destroy(&v); + gpr_log(GPR_INFO, "%s", s); + gpr_free(s); + + gpr_arena *a = gpr_arena_create(init_size); + void **ps = gpr_zalloc(sizeof(*ps) * nallocs); + for (size_t i = 0; i < nallocs; i++) { + ps[i] = gpr_arena_alloc(a, allocs[i]); + for (size_t j = 0; j < i; j++) { + GPR_ASSERT(ps[i] != ps[j]); + } + } + gpr_arena_destroy(a); +} + +#define TEST(name, init_size, ...) \ + static const size_t allocs_##name[] = {__VA_ARGS__}; \ + test(#name, init_size, allocs_##name, GPR_ARRAY_SIZE(allocs_##name)) + +int main(int argc, char *argv[]) { + grpc_test_init(argc, argv); + + test_noop(); + TEST(0_1, 0, 1); + TEST(1_1, 1, 1); + TEST(1_2, 1, 2); + TEST(1_3, 1, 3); + TEST(1_inc, 1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11); + TEST(6_123, 6, 1, 2, 3); + + return 0; +} diff --git a/test/cpp/microbenchmarks/bm_arena.cc b/test/cpp/microbenchmarks/bm_arena.cc new file mode 100644 index 0000000000..33e073f749 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_arena.cc @@ -0,0 +1,69 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Benchmark arenas */ + +extern "C" { +#include "src/core/lib/support/arena.h" +} +#include "test/cpp/microbenchmarks/helpers.h" +#include "third_party/benchmark/include/benchmark/benchmark.h" + +static void BM_Arena_NoOp(benchmark::State& state) { + while (state.KeepRunning()) { + gpr_arena_destroy(gpr_arena_create(state.range(0))); + } +} +BENCHMARK(BM_Arena_NoOp)->Range(1, 1024 * 1024); + +static void BM_Arena_ManyAlloc(benchmark::State& state) { + gpr_arena* a = gpr_arena_create(state.range(0)); + while (state.KeepRunning()) { + gpr_arena_alloc(a, state.range(1)); + } + gpr_arena_destroy(a); +} +BENCHMARK(BM_Arena_ManyAlloc)->Ranges({{1, 1024 * 1024}, {1, 32 * 1024}}); + +static void BM_Arena_Batch(benchmark::State& state) { + while (state.KeepRunning()) { + gpr_arena* a = gpr_arena_create(state.range(0)); + for (int i = 0; i < state.range(1); i++) { + gpr_arena_alloc(a, state.range(2)); + } + gpr_arena_destroy(a); + } +} +BENCHMARK(BM_Arena_Batch)->Ranges({{1, 64 * 1024}, {1, 64}, {1, 1024}}); + +BENCHMARK_MAIN(); diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 131a013451..e4f56f2374 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1228,6 +1228,8 @@ src/core/lib/slice/slice_internal.h \ src/core/lib/slice/slice_string_helpers.c \ src/core/lib/slice/slice_string_helpers.h \ src/core/lib/support/alloc.c \ +src/core/lib/support/arena.c \ +src/core/lib/support/arena.h \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/backoff.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 767c2b2e36..1dc8606885 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -84,6 +84,21 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c", + "name": "arena_test", + "src": [ + "test/core/support/arena_test.c" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", @@ -2429,6 +2444,27 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "benchmark", + "gpr", + "gpr_test_util", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_benchmark", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "bm_arena", + "src": [ + "test/cpp/microbenchmarks/bm_arena.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "benchmark", @@ -7243,6 +7279,7 @@ "include/grpc/support/tls_pthread.h", "include/grpc/support/useful.h", "src/core/lib/profiling/timers.h", + "src/core/lib/support/arena.h", "src/core/lib/support/backoff.h", "src/core/lib/support/block_annotate.h", "src/core/lib/support/env.h", @@ -7290,6 +7327,8 @@ "src/core/lib/profiling/stap_timers.c", "src/core/lib/profiling/timers.h", "src/core/lib/support/alloc.c", + "src/core/lib/support/arena.c", + "src/core/lib/support/arena.h", "src/core/lib/support/avl.c", "src/core/lib/support/backoff.c", "src/core/lib/support/backoff.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index f9e2a19d0e..1eebc9a1d5 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -89,6 +89,28 @@ "windows" ] }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "arena_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, { "args": [], "ci_platforms": [ @@ -2605,6 +2627,28 @@ "windows" ] }, + { + "args": [ + "--benchmark_min_time=0" + ], + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c++", + "name": "bm_arena", + "platforms": [ + "linux", + "mac", + "posix" + ] + }, { "args": [ "--benchmark_min_time=0" diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln index daafd3f350..c5bbaed60c 100644 --- a/vsprojects/buildtests_c.sln +++ b/vsprojects/buildtests_c.sln @@ -45,6 +45,15 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "alpn_test", "vcxproj\test\a {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} EndProjectSection EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "arena_test", "vcxproj\test\arena_test\arena_test.vcxproj", "{D85AC722-A88F-4280-F62E-672F571787FF}" + ProjectSection(myProperties) = preProject + lib = "False" + EndProjectSection + ProjectSection(ProjectDependencies) = postProject + {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037} + {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} + EndProjectSection +EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "bad_client_test", "vcxproj\test/bad_client\bad_client_test\bad_client_test.vcxproj", "{BA67B418-B699-E41A-9CC4-0279C49481A5}" ProjectSection(myProperties) = preProject lib = "True" @@ -1708,6 +1717,22 @@ Global {5BAAE7EA-A972-DD80-F190-29B9E3110BB3}.Release-DLL|Win32.Build.0 = Release|Win32 {5BAAE7EA-A972-DD80-F190-29B9E3110BB3}.Release-DLL|x64.ActiveCfg = Release|x64 {5BAAE7EA-A972-DD80-F190-29B9E3110BB3}.Release-DLL|x64.Build.0 = Release|x64 + {D85AC722-A88F-4280-F62E-672F571787FF}.Debug|Win32.ActiveCfg = Debug|Win32 + {D85AC722-A88F-4280-F62E-672F571787FF}.Debug|x64.ActiveCfg = Debug|x64 + {D85AC722-A88F-4280-F62E-672F571787FF}.Release|Win32.ActiveCfg = Release|Win32 + {D85AC722-A88F-4280-F62E-672F571787FF}.Release|x64.ActiveCfg = Release|x64 + {D85AC722-A88F-4280-F62E-672F571787FF}.Debug|Win32.Build.0 = Debug|Win32 + {D85AC722-A88F-4280-F62E-672F571787FF}.Debug|x64.Build.0 = Debug|x64 + {D85AC722-A88F-4280-F62E-672F571787FF}.Release|Win32.Build.0 = Release|Win32 + {D85AC722-A88F-4280-F62E-672F571787FF}.Release|x64.Build.0 = Release|x64 + {D85AC722-A88F-4280-F62E-672F571787FF}.Debug-DLL|Win32.ActiveCfg = Debug|Win32 + {D85AC722-A88F-4280-F62E-672F571787FF}.Debug-DLL|Win32.Build.0 = Debug|Win32 + {D85AC722-A88F-4280-F62E-672F571787FF}.Debug-DLL|x64.ActiveCfg = Debug|x64 + {D85AC722-A88F-4280-F62E-672F571787FF}.Debug-DLL|x64.Build.0 = Debug|x64 + {D85AC722-A88F-4280-F62E-672F571787FF}.Release-DLL|Win32.ActiveCfg = Release|Win32 + {D85AC722-A88F-4280-F62E-672F571787FF}.Release-DLL|Win32.Build.0 = Release|Win32 + {D85AC722-A88F-4280-F62E-672F571787FF}.Release-DLL|x64.ActiveCfg = Release|x64 + {D85AC722-A88F-4280-F62E-672F571787FF}.Release-DLL|x64.Build.0 = Release|x64 {BA67B418-B699-E41A-9CC4-0279C49481A5}.Debug|Win32.ActiveCfg = Debug|Win32 {BA67B418-B699-E41A-9CC4-0279C49481A5}.Debug|x64.ActiveCfg = Debug|x64 {BA67B418-B699-E41A-9CC4-0279C49481A5}.Release|Win32.ActiveCfg = Release|Win32 diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj b/vsprojects/vcxproj/gpr/gpr.vcxproj index 44c21ddeb3..1b37ace69a 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj @@ -188,6 +188,7 @@ + @@ -208,6 +209,8 @@ + + diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters index a5924a624a..fafd54cdf9 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters @@ -10,6 +10,9 @@ src\core\lib\support + + src\core\lib\support + src\core\lib\support @@ -254,6 +257,9 @@ src\core\lib\profiling + + src\core\lib\support + src\core\lib\support diff --git a/vsprojects/vcxproj/test/arena_test/arena_test.vcxproj b/vsprojects/vcxproj/test/arena_test/arena_test.vcxproj new file mode 100644 index 0000000000..5ae2f8e483 --- /dev/null +++ b/vsprojects/vcxproj/test/arena_test/arena_test.vcxproj @@ -0,0 +1,193 @@ + + + + + + Debug + Win32 + + + Debug + x64 + + + Release + Win32 + + + Release + x64 + + + + {D85AC722-A88F-4280-F62E-672F571787FF} + true + $(SolutionDir)IntDir\$(MSBuildProjectName)\ + + + + v100 + + + v110 + + + v120 + + + v140 + + + Application + true + Unicode + + + Application + false + true + Unicode + + + + + + + + + + + + + + arena_test + static + Debug + static + Debug + + + arena_test + static + Release + static + Release + + + + NotUsing + Level3 + Disabled + WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) + true + MultiThreadedDebug + true + None + false + + + Console + true + false + + + + + + NotUsing + Level3 + Disabled + WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) + true + MultiThreadedDebug + true + None + false + + + Console + true + false + + + + + + NotUsing + Level3 + MaxSpeed + WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) + true + true + true + MultiThreaded + true + None + false + + + Console + true + false + true + true + + + + + + NotUsing + Level3 + MaxSpeed + WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) + true + true + true + MultiThreaded + true + None + false + + + Console + true + false + true + true + + + + + + + + + + {EAB0A629-17A9-44DB-B5FF-E91A721FE037} + + + {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} + + + + + + + + + + + + + + + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + + + + + diff --git a/vsprojects/vcxproj/test/arena_test/arena_test.vcxproj.filters b/vsprojects/vcxproj/test/arena_test/arena_test.vcxproj.filters new file mode 100644 index 0000000000..c470f17527 --- /dev/null +++ b/vsprojects/vcxproj/test/arena_test/arena_test.vcxproj.filters @@ -0,0 +1,21 @@ + + + + + test\core\support + + + + + + {130788b2-eacc-90df-a4f6-f5102a7d3370} + + + {5c3e1753-6fdb-9476-f98c-a3a394fac54a} + + + {1d3d2cc8-4e69-8b2e-6ceb-6569fcb19a86} + + + + -- cgit v1.2.3 From 0dd81003b5a9601d4bc79afdd269a7e93b53c5a9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 06:57:29 -0700 Subject: Concurrent test --- src/core/lib/support/arena.c | 5 +++++ test/core/support/arena_test.c | 43 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) (limited to 'src/core/lib') diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index a5b0be4d48..faceb1a1eb 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -36,6 +36,9 @@ #include #include +#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ + (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) + typedef struct zone { size_t size_begin; size_t size_end; @@ -48,6 +51,7 @@ struct gpr_arena { }; gpr_arena *gpr_arena_create(size_t initial_size) { + initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); a->initial_zone.size_end = initial_size; return a; @@ -66,6 +70,7 @@ size_t gpr_arena_destroy(gpr_arena *arena) { } void *gpr_arena_alloc(gpr_arena *arena, size_t size) { + size = ROUND_UP_TO_ALIGNMENT_SIZE(size); size_t start = (size_t)gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size); zone *z = &arena->initial_zone; diff --git a/test/core/support/arena_test.c b/test/core/support/arena_test.c index bbf6d48db3..3e21867bcf 100644 --- a/test/core/support/arena_test.c +++ b/test/core/support/arena_test.c @@ -36,8 +36,11 @@ #include #include #include +#include +#include #include #include +#include #include "src/core/lib/support/string.h" #include "test/core/util/test_config.h" @@ -65,9 +68,12 @@ static void test(const char *name, size_t init_size, const size_t *allocs, void **ps = gpr_zalloc(sizeof(*ps) * nallocs); for (size_t i = 0; i < nallocs; i++) { ps[i] = gpr_arena_alloc(a, allocs[i]); + // ensure no duplicate results for (size_t j = 0; j < i; j++) { GPR_ASSERT(ps[i] != ps[j]); } + // ensure writable + memset(ps[i], 1, allocs[i]); } gpr_arena_destroy(a); } @@ -76,6 +82,42 @@ static void test(const char *name, size_t init_size, const size_t *allocs, static const size_t allocs_##name[] = {__VA_ARGS__}; \ test(#name, init_size, allocs_##name, GPR_ARRAY_SIZE(allocs_##name)) +#define CONCURRENT_TEST_ITERATIONS 100000 +#define CONCURRENT_TEST_THREADS 100 + +typedef struct { + gpr_event ev_start; + gpr_arena *arena; +} concurrent_test_args; + +static void concurrent_test_body(void *arg) { + concurrent_test_args *a = arg; + gpr_event_wait(&a->ev_start, gpr_inf_future(GPR_CLOCK_REALTIME)); + for (size_t i = 0; i < CONCURRENT_TEST_ITERATIONS; i++) { + *(char *)gpr_arena_alloc(a->arena, 1) = (char)i; + } +} + +static void concurrent_test(void) { + concurrent_test_args args; + gpr_event_init(&args.ev_start); + args.arena = gpr_arena_create(1024); + + gpr_thd_id thds[CONCURRENT_TEST_THREADS]; + + for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) { + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_is_joinable(&opt); + gpr_thd_new(&thds[i], concurrent_test_body, &args, &opt); + } + + gpr_event_set(&args.ev_start, (void *)1); + + for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) { + gpr_thd_join(thds[i]); + } +} + int main(int argc, char *argv[]) { grpc_test_init(argc, argv); @@ -86,6 +128,7 @@ int main(int argc, char *argv[]) { TEST(1_3, 1, 3); TEST(1_inc, 1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11); TEST(6_123, 6, 1, 2, 3); + concurrent_test(); return 0; } -- cgit v1.2.3 From 37723c9ee07c05633b763cfd5cfe0935a20a9258 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 07:45:44 -0700 Subject: Fix race condition --- src/core/lib/support/arena.c | 11 +++++++---- test/core/support/arena_test.c | 7 ++++++- 2 files changed, 13 insertions(+), 5 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index faceb1a1eb..b5c32b2041 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -34,6 +34,7 @@ #include "src/core/lib/support/arena.h" #include #include +#include #include #define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ @@ -74,16 +75,16 @@ void *gpr_arena_alloc(gpr_arena *arena, size_t size) { size_t start = (size_t)gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size); zone *z = &arena->initial_zone; - while (start > z->size_begin) { + while (start > z->size_end) { zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm); - while (next_z == NULL) { - size_t next_z_size = GPR_MAX(2 * start, size); + if (next_z == NULL) { + size_t next_z_size = GPR_MAX((size_t)gpr_atm_no_barrier_load(&arena->size_so_far), size); next_z = gpr_zalloc(sizeof(zone) + next_z_size); next_z->size_begin = z->size_end; next_z->size_end = z->size_end + next_z_size; if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) { gpr_free(next_z); - next_z = NULL; + next_z = (zone*)gpr_atm_acq_load(&z->next_atm); } } z = next_z; @@ -91,5 +92,7 @@ void *gpr_arena_alloc(gpr_arena *arena, size_t size) { if (start + size > z->size_end) { return gpr_arena_alloc(arena, size); } + GPR_ASSERT(start >= z->size_begin); + GPR_ASSERT(start + size <= z->size_end); return ((char *)(z + 1)) + start - z->size_begin; } diff --git a/test/core/support/arena_test.c b/test/core/support/arena_test.c index 3e21867bcf..35b2bbd1b1 100644 --- a/test/core/support/arena_test.c +++ b/test/core/support/arena_test.c @@ -76,6 +76,7 @@ static void test(const char *name, size_t init_size, const size_t *allocs, memset(ps[i], 1, allocs[i]); } gpr_arena_destroy(a); + gpr_free(ps); } #define TEST(name, init_size, ...) \ @@ -99,6 +100,8 @@ static void concurrent_test_body(void *arg) { } static void concurrent_test(void) { + gpr_log(GPR_DEBUG, "concurrent_test"); + concurrent_test_args args; gpr_event_init(&args.ev_start); args.arena = gpr_arena_create(1024); @@ -107,7 +110,7 @@ static void concurrent_test(void) { for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) { gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_is_joinable(&opt); + gpr_thd_options_set_joinable(&opt); gpr_thd_new(&thds[i], concurrent_test_body, &args, &opt); } @@ -116,6 +119,8 @@ static void concurrent_test(void) { for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) { gpr_thd_join(thds[i]); } + + gpr_arena_destroy(args.arena); } int main(int argc, char *argv[]) { -- cgit v1.2.3 From 09acb108b7620eff840fb0428f7eb159a4f27cf4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 07:45:59 -0700 Subject: clang-format --- src/core/lib/support/arena.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index b5c32b2041..6610acd3a0 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -78,13 +78,14 @@ void *gpr_arena_alloc(gpr_arena *arena, size_t size) { while (start > z->size_end) { zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm); if (next_z == NULL) { - size_t next_z_size = GPR_MAX((size_t)gpr_atm_no_barrier_load(&arena->size_so_far), size); + size_t next_z_size = + GPR_MAX((size_t)gpr_atm_no_barrier_load(&arena->size_so_far), size); next_z = gpr_zalloc(sizeof(zone) + next_z_size); next_z->size_begin = z->size_end; next_z->size_end = z->size_end + next_z_size; if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) { gpr_free(next_z); - next_z = (zone*)gpr_atm_acq_load(&z->next_atm); + next_z = (zone *)gpr_atm_acq_load(&z->next_atm); } } z = next_z; -- cgit v1.2.3 From e7a1702fe99ef598efd957c1f7546904298b9dba Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 10:20:38 -0700 Subject: Fixes --- src/core/lib/channel/channel_stack.c | 7 ++++--- src/core/lib/channel/channel_stack.h | 10 +++++----- src/core/lib/channel/compress_filter.c | 2 +- src/core/lib/channel/connected_channel.c | 4 ++-- src/core/lib/channel/deadline_filter.c | 2 +- src/core/lib/channel/http_client_filter.c | 2 +- src/core/lib/channel/http_server_filter.c | 2 +- src/core/lib/channel/message_size_filter.c | 2 +- src/core/lib/surface/call.c | 22 ++++++++++++++++++---- src/core/lib/surface/lame_client.c | 4 ++-- src/core/lib/surface/server.c | 2 +- src/core/lib/transport/transport.h | 3 ++- src/core/lib/transport/transport_impl.h | 3 ++- 13 files changed, 41 insertions(+), 24 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 3fb2a60ac7..187f2c8b34 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -241,15 +241,16 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set( void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ for (i = 0; i < count; i++) { - elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], final_info, - i == count - 1 ? and_free_memory : NULL); + elems[i].filter->destroy_call_elem( + exec_ctx, &elems[i], final_info, + i == count - 1 ? then_schedule_closure : NULL); } } diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 6d3340bcbf..1b02f02dd6 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -139,12 +139,12 @@ typedef struct { /* Destroy per call data. The filter does not need to do any chaining. The bottom filter of a stack will be passed a non-NULL pointer to - \a and_free_memory that should be passed to gpr_free when destruction - is complete. \a final_info contains data about the completed call, mainly - for reporting purposes. */ + \a then_schedule_closure that should be passed to grpc_closure_sched when + destruction is complete. \a final_info contains data about the completed + call, mainly for reporting purposes. */ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory); + grpc_closure *then_schedule_closure); /* sizeof(per channel data) */ size_t sizeof_channel_data; @@ -271,7 +271,7 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, const grpc_call_final_info *final_info, - void *and_free_memory); + grpc_closure *then_schedule_closure); /* Ignore set pollset{_set} - used by filters if they don't care about pollsets * at all. Does nothing. */ diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index aa41014a21..02dc479f3a 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -292,7 +292,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 29796f7ca7..64cf95b5a5 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -105,12 +105,12 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_transport_destroy_stream(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - and_free_memory); + then_schedule_closure); } /* Constructor for channel_data */ diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 5a12d62f1d..34114bbebf 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -256,7 +256,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // Destructor for call_data. Used for both client and server filters. static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, - void* and_free_memory) { + grpc_closure* ignored) { grpc_deadline_state_destroy(exec_ctx, elem); } diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index c031533dd8..f9d0d689ac 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -412,7 +412,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); } diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index fb70de8e96..bebd3af335 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -358,7 +358,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer); } diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index b424c0d2ac..5ba13fe251 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -200,7 +200,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // Destructor for call_data. static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, - void* ignored) {} + grpc_closure* ignored) {} // Constructor for channel_data. static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c2547c5147..b57dd8bd8a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -51,6 +51,7 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -138,6 +139,7 @@ typedef struct batch_control { } batch_control; struct grpc_call { + gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; @@ -212,6 +214,8 @@ struct grpc_call { grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; + grpc_closure release_call; + union { struct { grpc_status_code *status; @@ -273,7 +277,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_channel_get_channel_stack(args->channel); grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); - call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size); + gpr_arena *arena = gpr_arena_create(8192); + call = gpr_arena_alloc(arena, + sizeof(grpc_call) + channel_stack->call_stack_size); + call->arena = arena; *out_call = call; gpr_mu_init(&call->mu); call->channel = args->channel; @@ -421,6 +428,13 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } +static void release_call(grpc_exec_ctx *exec_ctx, void *call, + grpc_error *error) { + grpc_call *c = call; + gpr_arena_destroy(c->arena); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); +} + static void set_status_value_directly(grpc_status_code status, void *dest); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { @@ -447,7 +461,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - grpc_channel *channel = c->channel; get_final_status(call, set_status_value_directly, &c->final_info.final_status, NULL); @@ -459,8 +472,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error); } - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, + grpc_closure_init(&c->release_call, release_call, c, + grpc_schedule_on_exec_ctx)); GPR_TIMER_END("destroy_call", 0); } diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 49bc4c114b..9ddb88bd11 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -130,8 +130,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { - gpr_free(and_free_memory); + grpc_closure *then_schedule_closure) { + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index b360579553..1186a4af63 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -898,7 +898,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index cc1c277b35..11fd5d1697 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -246,7 +246,8 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, caller, but any child memory must be cleaned up) */ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream, void *and_free_memory); + grpc_stream *stream, + grpc_closure *then_schedule_closure); void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 8553148c35..9b6095f666 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -67,7 +67,8 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, void *and_free_memory); + grpc_stream *stream, + grpc_closure *then_schedule_closure); /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); -- cgit v1.2.3 From d426caca811823f525334ec5952995e0b887f04f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 12:30:45 -0700 Subject: Use an arena for call & subchannel_call allocation --- src/core/ext/census/grpc_filter.c | 4 +-- src/core/ext/client_channel/client_channel.c | 26 +++++++++++++---- src/core/ext/client_channel/subchannel.c | 33 ++++++++++++++++------ src/core/ext/client_channel/subchannel.h | 18 ++++++++++-- .../ext/load_reporting/load_reporting_filter.c | 2 +- .../transport/chttp2/transport/chttp2_transport.c | 7 +++-- src/core/ext/transport/chttp2/transport/internal.h | 2 +- src/core/lib/channel/channel_stack.c | 29 +++++++------------ src/core/lib/channel/channel_stack.h | 13 +++++---- .../lib/security/transport/client_auth_filter.c | 2 +- .../lib/security/transport/server_auth_filter.c | 2 +- src/core/lib/surface/call.c | 15 ++++++---- src/core/lib/transport/transport.c | 5 ++-- test/core/channel/channel_stack_test.c | 16 +++++++---- test/core/end2end/tests/filter_call_init_fails.c | 2 +- test/core/end2end/tests/filter_causes_close.c | 2 +- test/core/end2end/tests/filter_latency.c | 4 +-- 17 files changed, 116 insertions(+), 66 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index b80d831557..fc29dbd454 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -138,7 +138,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ @@ -160,7 +160,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index bf64f84772..f6550292a0 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -660,6 +660,7 @@ typedef struct client_channel_call_data { /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; + gpr_arena *arena; subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; @@ -754,9 +755,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; + grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -982,9 +988,14 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; + grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -1161,6 +1172,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->owning_call = args->call_stack; calld->pollent = NULL; + calld->arena = args->arena; GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); grpc_closure_sched( exec_ctx, @@ -1175,7 +1187,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); grpc_slice_unref_internal(exec_ctx, calld->path); @@ -1185,6 +1197,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { + grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); @@ -1194,7 +1208,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, "picked"); } gpr_free(calld->waiting_ops); - gpr_free(and_free_memory); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 5df0a9060d..e4b669c582 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -148,6 +148,7 @@ struct grpc_subchannel { struct grpc_subchannel_call { grpc_connected_subchannel *connection; + grpc_closure *schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) @@ -719,13 +720,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_subchannel_call *c = call; + GPR_ASSERT(c->schedule_closure_after_destroy != NULL); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_connected_subchannel *connection = c->connection; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, + c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } +void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call, + grpc_closure *closure) { + GPR_ASSERT(call->schedule_closure_after_destroy == NULL); + GPR_ASSERT(closure != NULL); + call->schedule_closure_after_destroy = closure; +} + void grpc_subchannel_call_ref( grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); @@ -761,15 +771,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **call) { + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = gpr_zalloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + *call = gpr_arena_alloc( + args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. - grpc_error *error = - grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, start_time, deadline, callstk); + grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = NULL, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; + grpc_error *error = grpc_call_stack_init( + exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); @@ -778,7 +795,7 @@ grpc_error *grpc_connected_subchannel_create_call( return error; } GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent); + grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 6a70a76467..3e64a2507c 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -37,6 +37,7 @@ #include "src/core/ext/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a subchannel call */ +typedef struct { + grpc_polling_entity *pollent; + grpc_slice path; + gpr_timespec start_time; + gpr_timespec deadline; + gpr_arena *arena; +} grpc_connected_subchannel_call_args; + grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **subchannel_call); + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( @@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call); +/** Must be called once per call. Sets the 'then_schedule_closure' argument for + call stack destruction. */ +void grpc_subchannel_call_set_cleanup_closure( + grpc_subchannel_call *subchannel_call, grpc_closure *closure); + grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call); diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index c2750634a5..4ed832671d 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -123,7 +123,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; /* TODO(dgq): do something with the data diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index da4c7dc7b2..8024dd4702 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -665,16 +665,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_TIMER_END("destroy_stream", 0); - gpr_free(s->destroy_stream_arg); + grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->destroy_stream_arg = and_free_memory; + s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, grpc_combiner_scheduler(t->combiner, false)), diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index d26812ad6b..3c56c21599 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -425,7 +425,7 @@ struct grpc_chttp2_stream { grpc_stream_refcount *refcount; grpc_closure destroy_stream; - void *destroy_stream_arg; + grpc_closure *destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 187f2c8b34..6d53b0576e 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -166,41 +166,32 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, } } -grpc_error *grpc_call_stack_init( - grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, - grpc_call_context_element *context, const void *transport_server_data, - grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, - grpc_call_stack *call_stack) { +grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, + const grpc_call_element_args *elem_args) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); size_t count = channel_stack->count; grpc_call_element *call_elems; char *user_data; size_t i; - call_stack->count = count; - GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy, + elem_args->call_stack->count = count; + GRPC_STREAM_REF_INIT(&elem_args->call_stack->refcount, initial_refs, destroy, destroy_arg, "CALL_STACK"); - call_elems = CALL_ELEMS_FROM_STACK(call_stack); + call_elems = CALL_ELEMS_FROM_STACK(elem_args->call_stack); user_data = ((char *)call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); /* init per-filter data */ grpc_error *first_error = GRPC_ERROR_NONE; - const grpc_call_element_args args = { - .start_time = start_time, - .call_stack = call_stack, - .server_transport_data = transport_server_data, - .context = context, - .path = path, - .deadline = deadline, - }; for (i = 0; i < count; i++) { call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; - grpc_error *error = - call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args); + grpc_error *error = call_elems[i].filter->init_call_elem( + exec_ctx, &call_elems[i], elem_args); if (error != GRPC_ERROR_NONE) { if (first_error == GRPC_ERROR_NONE) { first_error = error; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 1b02f02dd6..80e3603e8d 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -56,6 +56,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/transport.h" #ifdef __cplusplus @@ -84,6 +85,7 @@ typedef struct { grpc_slice path; gpr_timespec start_time; gpr_timespec deadline; + gpr_arena *arena; } grpc_call_element_args; typedef struct { @@ -236,12 +238,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, /* Initialize a call stack given a channel stack. transport_server_data is expected to be NULL on a client, or an opaque transport owned pointer on the server. */ -grpc_error *grpc_call_stack_init( - grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, - grpc_call_context_element *context, const void *transport_server_data, - grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, - grpc_call_stack *call_stack); +grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, + const grpc_call_element_args *elem_args); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index a23082a866..8dea1d98ff 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -318,7 +318,7 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_call_credentials_unref(exec_ctx, calld->creds); if (calld->have_host) { diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 14619d97ca..01cb473177 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -227,7 +227,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) {} + grpc_closure *ignored) {} /* Constructor for channel_data */ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index b57dd8bd8a..84c401db35 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -367,11 +367,16 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ + grpc_call_element_args call_args = { + .call_stack = CALL_STACK_FROM_CALL(call), + .server_transport_data = args->server_transport_data, + .context = call->context, + .path = path, + .start_time = call->start_time, + .deadline = send_deadline, + .arena = call->arena}; add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, - destroy_call, call, call->context, - args->server_transport_data, path, - call->start_time, send_deadline, - CALL_STACK_FROM_CALL(call))); + destroy_call, call, &call_args)); if (error != GRPC_ERROR_NONE) { cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); @@ -431,8 +436,8 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; - gpr_arena_destroy(c->arena); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); + gpr_arena_destroy(c->arena); } static void set_status_value_directly(grpc_status_code status, void *dest); diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 165950e288..7635fc730d 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -197,9 +197,10 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream, void *and_free_memory) { + grpc_stream *stream, + grpc_closure *then_schedule_closure) { transport->vtable->destroy_stream(exec_ctx, transport, stream, - and_free_memory); + then_schedule_closure); } char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 76bb57346c..0c424c1ec4 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -68,7 +68,7 @@ static void channel_destroy_func(grpc_exec_ctx *exec_ctx, static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { ++*(int *)(elem->channel_data); } @@ -139,10 +139,16 @@ static void test_create_channel_stack(void) { GPR_ASSERT(*channel_data == 0); call_stack = gpr_malloc(channel_stack->call_stack_size); - grpc_error *error = - grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack, - NULL, NULL, path, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC), call_stack); + grpc_call_element_args args = { + .call_stack = call_stack, + .server_transport_data = NULL, + .context = NULL, + .path = path, + .start_time = gpr_now(GPR_CLOCK_MONOTONIC), + .deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC), + .arena = NULL}; + grpc_error *error = grpc_call_stack_init(&exec_ctx, channel_stack, 1, + free_call, call_stack, &args); GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(call_stack->count == 1); call_elem = grpc_call_stack_element(call_stack, 0); diff --git a/test/core/end2end/tests/filter_call_init_fails.c b/test/core/end2end/tests/filter_call_init_fails.c index d2d6e82d57..65216cf19d 100644 --- a/test/core/end2end/tests/filter_call_init_fails.c +++ b/test/core/end2end/tests/filter_call_init_fails.c @@ -213,7 +213,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) {} + grpc_closure *ignored) {} static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 25e606556d..c968f30b3b 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -236,7 +236,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) {} + grpc_closure *ignored) {} static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, diff --git a/test/core/end2end/tests/filter_latency.c b/test/core/end2end/tests/filter_latency.c index d05e9e79a1..2428c92a42 100644 --- a/test/core/end2end/tests/filter_latency.c +++ b/test/core/end2end/tests/filter_latency.c @@ -267,7 +267,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *ignored) { gpr_mu_lock(&g_mu); g_client_latency = final_info->stats.latency; gpr_mu_unlock(&g_mu); @@ -276,7 +276,7 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *ignored) { gpr_mu_lock(&g_mu); g_server_latency = final_info->stats.latency; gpr_mu_unlock(&g_mu); -- cgit v1.2.3 From 7d2c39827688026ca54b6a8a4d36c36d79aa282a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 12:58:29 -0700 Subject: Additionally use arena for incoming metadata allocation --- .../transport/chttp2/transport/chttp2_transport.c | 26 +++++---- .../transport/chttp2/transport/incoming_metadata.c | 65 ++++++++-------------- .../transport/chttp2/transport/incoming_metadata.h | 18 +++--- src/core/ext/transport/chttp2/transport/parsing.c | 18 +++++- src/core/lib/channel/connected_channel.c | 2 +- src/core/lib/transport/transport.c | 4 +- src/core/lib/transport/transport.h | 3 +- src/core/lib/transport/transport_impl.h | 2 +- 8 files changed, 67 insertions(+), 71 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8024dd4702..114b14e884 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -575,7 +575,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -588,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); @@ -1630,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS, - grpc_slice_from_copied_string(status_string))); + GRPC_LOG_IF_ERROR("add_status", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_GRPC_STATUS, + grpc_slice_from_copied_string(status_string)))); if (msg != NULL) { - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_from_copied_string(msg))); + GRPC_LOG_IF_ERROR( + "add_status_message", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index c91b019aa0..ab900fdff2 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -41,69 +41,48 @@ #include void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { + grpc_metadata_batch_init(&buffer->batch); + buffer->arena = arena; + buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) { - size_t i; - if (!buffer->published) { - for (i = 0; i < buffer->count; i++) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - gpr_free(buffer->elems); + grpc_metadata_batch_destroy(exec_ctx, &buffer->batch); } -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - GPR_ASSERT(!buffer->published); - if (buffer->capacity == buffer->count) { - buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); - buffer->elems = - gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); - } - buffer->elems[buffer->count++].md = elem; +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); + return grpc_metadata_batch_add_tail( + exec_ctx, &buffer->batch, + gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); } -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - for (size_t i = 0; i < buffer->count; i++) { - if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - buffer->elems[i].md = elem; - return; + for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL; + l = l->next) { + if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + l->md = elem; + return GRPC_ERROR_NONE; } } - grpc_chttp2_incoming_metadata_buffer_add(buffer, elem); + return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem); } void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { - GPR_ASSERT(!buffer->published); - buffer->deadline = deadline; + buffer->batch.deadline = deadline; } void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) { - GPR_ASSERT(!buffer->published); - buffer->published = 1; - if (buffer->count > 0) { - size_t i; - for (i = 0; i < buffer->count; i++) { - /* TODO(ctiller): do something better here */ - if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish", - grpc_metadata_batch_link_tail( - exec_ctx, batch, &buffer->elems[i]))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - } else { - batch->list.head = batch->list.tail = NULL; - } - batch->deadline = buffer->deadline; + *batch = buffer->batch; + grpc_metadata_batch_init(&buffer->batch); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index 1eac6fc150..288c917e65 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -37,28 +37,26 @@ #include "src/core/lib/transport/transport.h" typedef struct { - grpc_linked_mdelem *elems; - size_t count; - size_t capacity; - gpr_timespec deadline; - int published; + gpr_arena *arena; + grpc_metadata_batch batch; size_t size; // total size of metadata } grpc_chttp2_incoming_metadata_buffer; /** assumes everything initially zeroed */ void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena); void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem); -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, - grpc_mdelem elem); + grpc_mdelem elem) GRPC_MUST_USE_RESULT; +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) GRPC_MUST_USE_RESULT; void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index e7f2597f89..7efc8c63c9 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -548,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[0], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } } @@ -598,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[1], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } GPR_TIMER_END("on_trailing_header", 0); diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 64cf95b5a5..42ef7b7806 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -88,7 +88,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; int r = grpc_transport_init_stream( exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - &args->call_stack->refcount, args->server_transport_data); + &args->call_stack->refcount, args->server_transport_data, args->arena); return r == 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("transport stream initialization failed"); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 7635fc730d..c1a364cbbf 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -162,9 +162,9 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { return transport->vtable->init_stream(exec_ctx, transport, stream, refcount, - server_data); + server_data, arena); } void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 11fd5d1697..950b18aeda 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/byte_stream.h" #include "src/core/lib/transport/metadata_batch.h" @@ -229,7 +230,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport); int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data); + const void *server_data, gpr_arena *arena); void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_polling_entity *pollent); diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 9b6095f666..6f688bf8d2 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -47,7 +47,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_init_stream */ int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data); + const void *server_data, gpr_arena *arena); /* implementation of grpc_transport_set_pollset */ void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self, -- cgit v1.2.3 From b18c8ba0b6bbc2317dd92353288f2b1c0ff353d5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 15:51:37 -0700 Subject: Fix cancellation --- src/core/lib/surface/call.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 8b2f9fd3f5..47f36be5fd 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1070,8 +1070,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); - gpr_atm_rel_store(&call->received_final_op_atm, 1); /* propagate cancellation to any interested children */ + gpr_atm_rel_store(&call->received_final_op_atm, 1); gpr_mu_lock(&call->child_list_mu); child_call = call->first_child; if (child_call != NULL) { @@ -1079,7 +1079,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, next_child_call = child_call->sibling_next; if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); } -- cgit v1.2.3 From a6bec8fc3159b54fc447a029b6260b3d6c478e6b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Mar 2017 08:26:04 -0700 Subject: Auto-call-arena-sizing --- src/core/lib/support/arena.c | 1 + src/core/lib/surface/call.c | 6 ++++-- src/core/lib/surface/channel.c | 32 ++++++++++++++++++++++++++++++++ src/core/lib/surface/channel.h | 3 +++ 4 files changed, 40 insertions(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index 6610acd3a0..8ce29aaab3 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -53,6 +53,7 @@ struct gpr_arena { gpr_arena *gpr_arena_create(size_t initial_size) { initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); + gpr_log(GPR_DEBUG, "arena create: %" PRIdPTR, initial_size); gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); a->initial_zone.size_end = initial_size; return a; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 84c401db35..00719be9ab 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -277,7 +277,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_channel_get_channel_stack(args->channel); grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); - gpr_arena *arena = gpr_arena_create(8192); + gpr_arena *arena = + gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel)); call = gpr_arena_alloc(arena, sizeof(grpc_call) + channel_stack->call_stack_size); call->arena = arena; @@ -436,8 +437,9 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; + grpc_channel_update_call_size_estimate(c->channel, + gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); - gpr_arena_destroy(c->arena); } static void set_status_value_directly(grpc_status_code status, void *dest); diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index d6acd392c1..56beb7d96e 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -68,6 +68,8 @@ struct grpc_channel { grpc_compression_options compression_options; grpc_mdelem default_authority; + gpr_atm call_size_estimate; + gpr_mu registered_call_mu; registered_call *registered_calls; @@ -115,6 +117,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; + gpr_atm_no_barrier_store( + &channel->call_size_estimate, + CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size); + grpc_compression_options_init(&channel->compression_options); for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { @@ -177,6 +183,32 @@ done: return channel; } +size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) { +#define ROUND_UP_SIZE 256 + return ((size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate) + + ROUND_UP_SIZE) & + ~(size_t)(ROUND_UP_SIZE - 1); +} + +void grpc_channel_update_call_size_estimate(grpc_channel *channel, + size_t size) { + size_t cur = (size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate); + if (cur < size) { + /* size grew: update estimate */ + gpr_atm_no_barrier_cas(&channel->call_size_estimate, (gpr_atm)cur, + (gpr_atm)size); + /* if we lose: never mind, something else will likely update soon enough */ + } else if (cur == size) { + /* no change: holding pattern */ + } else if (cur > 0) { + /* size shrank: decrease estimate */ + gpr_atm_no_barrier_cas( + &channel->call_size_estimate, (gpr_atm)cur, + (gpr_atm)(GPR_MIN(cur - 1, (255 * cur + size) / 256))); + /* if we lose: never mind, something else will likely update soon enough */ + } +} + char *grpc_channel_get_target(grpc_channel *channel) { GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel)); return gpr_strdup(channel->target); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 3a441d7add..c4aebd8b9b 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -66,6 +66,9 @@ grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx *exec_ctx, grpc_channel *channel, int status_code); +size_t grpc_channel_get_call_size_estimate(grpc_channel *channel); +void grpc_channel_update_call_size_estimate(grpc_channel *channel, size_t size); + #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, -- cgit v1.2.3 From fb9d11204388b524b942950b47fef24116eab243 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Mar 2017 09:26:27 -0700 Subject: Review comments --- src/core/ext/client_channel/client_channel.c | 4 ++-- src/core/ext/client_channel/subchannel.c | 14 +++++++------- .../ext/transport/chttp2/transport/incoming_metadata.c | 2 +- src/core/lib/support/arena.c | 1 - test/core/channel/channel_stack_test.c | 2 +- 5 files changed, 11 insertions(+), 12 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index f6550292a0..ba842c7916 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -755,7 +755,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; - grpc_connected_subchannel_call_args call_args = { + const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, .path = calld->path, .start_time = calld->call_start_time, @@ -988,7 +988,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; - grpc_connected_subchannel_call_args call_args = { + const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, .path = calld->path, .start_time = calld->call_start_time, diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index e4b669c582..cef08a04ea 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -778,13 +778,13 @@ grpc_error *grpc_connected_subchannel_create_call( args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. - grpc_call_element_args call_args = {.call_stack = callstk, - .server_transport_data = NULL, - .context = NULL, - .path = args->path, - .start_time = args->start_time, - .deadline = args->deadline, - .arena = args->arena}; + const grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = NULL, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; grpc_error *error = grpc_call_stack_init( exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index ab900fdff2..da0a34d32a 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -42,8 +42,8 @@ void grpc_chttp2_incoming_metadata_buffer_init( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { - grpc_metadata_batch_init(&buffer->batch); buffer->arena = arena; + grpc_metadata_batch_init(&buffer->batch); buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index 8ce29aaab3..6610acd3a0 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -53,7 +53,6 @@ struct gpr_arena { gpr_arena *gpr_arena_create(size_t initial_size) { initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); - gpr_log(GPR_DEBUG, "arena create: %" PRIdPTR, initial_size); gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); a->initial_zone.size_end = initial_size; return a; diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 0c424c1ec4..af551c4928 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -139,7 +139,7 @@ static void test_create_channel_stack(void) { GPR_ASSERT(*channel_data == 0); call_stack = gpr_malloc(channel_stack->call_stack_size); - grpc_call_element_args args = { + const grpc_call_element_args args = { .call_stack = call_stack, .server_transport_data = NULL, .context = NULL, -- cgit v1.2.3 From d26250be35a0e0921cb97f43b13ce14343ee4c7d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Mar 2017 11:01:52 -0700 Subject: Get growth right --- src/core/lib/support/arena.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index 6610acd3a0..7bcb983f24 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -78,8 +78,7 @@ void *gpr_arena_alloc(gpr_arena *arena, size_t size) { while (start > z->size_end) { zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm); if (next_z == NULL) { - size_t next_z_size = - GPR_MAX((size_t)gpr_atm_no_barrier_load(&arena->size_so_far), size); + size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far); next_z = gpr_zalloc(sizeof(zone) + next_z_size); next_z->size_begin = z->size_end; next_z->size_end = z->size_end + next_z_size; -- cgit v1.2.3 From 58796c8589d3a805032599ab8bf40a48360be091 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Tue, 14 Mar 2017 20:58:24 +0100 Subject: G stands for... --- build.yaml | 2 +- src/core/lib/surface/version.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/build.yaml b/build.yaml index 7c43df3250..2c0f6db1ad 100644 --- a/build.yaml +++ b/build.yaml @@ -13,7 +13,7 @@ settings: '#09': Per-language overrides are possible with (eg) ruby_version tag here '#10': See the expand_version.py for all the quirks here core_version: 3.0.0-dev - g_stands_for: green + g_stands_for: gentle version: 1.3.0-dev filegroups: - name: census diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 1143a9e044..ba80bd801e 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -38,4 +38,4 @@ const char *grpc_version_string(void) { return "3.0.0-dev"; } -const char *grpc_g_stands_for(void) { return "green"; } +const char *grpc_g_stands_for(void) { return "gentle"; } -- cgit v1.2.3 From 51006fea710ef7e4faefae653432f55947639b76 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Mar 2017 08:07:02 -0700 Subject: Fix use-after-free --- src/core/lib/surface/call.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 00719be9ab..9a8870b86a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -437,9 +437,10 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; - grpc_channel_update_call_size_estimate(c->channel, + grpc_channel *channel = c->channel; + grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } static void set_status_value_directly(grpc_status_code status, void *dest); -- cgit v1.2.3 From dbad370aa5a5480824388d77207d3d0b2aa94a1c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Mar 2017 08:21:19 -0700 Subject: Fix use-after-free, cronet compiles --- .../transport/cronet/transport/cronet_transport.c | 46 +++++++++++++--------- src/core/lib/surface/call.c | 3 +- 2 files changed, 29 insertions(+), 20 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 01a03533da..e827423a2a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -177,6 +177,7 @@ struct op_storage { }; struct stream_obj { + gpr_arena *arena; struct op_and_state *oas; grpc_transport_stream_op *curr_op; grpc_cronet_transport *curr_ct; @@ -451,15 +452,18 @@ static void on_response_headers_received( gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, + s->arena); for (size_t i = 0; i < headers->count; i++) { - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(headers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_headers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.initial_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].value))))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -549,17 +553,20 @@ static void on_response_trailers_received( memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, + s->arena); for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(trailers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_trailers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.trailing_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].value))))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { @@ -1174,7 +1181,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; memset(&s->storage, 0, sizeof(s->storage)); s->storage.head = NULL; @@ -1194,6 +1201,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->curr_gs = gs; s->curr_ct = (grpc_cronet_transport *)gt; + s->arena = arena; gpr_mu_init(&s->mu); return 0; @@ -1238,9 +1246,11 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { stream_obj *s = (stream_obj *)gs; GRPC_ERROR_UNREF(s->state.cancel_error); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9a8870b86a..f1285b923d 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -438,8 +438,7 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; grpc_channel *channel = c->channel; - grpc_channel_update_call_size_estimate(channel, - gpr_arena_destroy(c->arena)); + grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } -- cgit v1.2.3 From 2ccd502ed4e3fffa2cccf7d3e0aa7103caf37bf0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Mar 2017 09:57:46 -0700 Subject: Add cast --- src/core/lib/surface/channel.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 56beb7d96e..2b700b2f67 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -119,7 +119,7 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, gpr_atm_no_barrier_store( &channel->call_size_estimate, - CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size); + (gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size); grpc_compression_options_init(&channel->compression_options); for (size_t i = 0; i < args->num_args; i++) { -- cgit v1.2.3