diff options
author | 2016-09-12 11:59:25 -0700 | |
---|---|---|
committer | 2016-09-12 11:59:25 -0700 | |
commit | 6e51f992c6bfdfba61d984ab173305da455bd2e7 (patch) | |
tree | 0ca276fbe0dac5c2be06d8ba74ac2193cdd3f550 /src/core/lib/surface | |
parent | ec5c93cabfbf535be2528df55ca8bb4500e6bc9b (diff) | |
parent | 537f7c2a136641487febeac89a25e430029eb40c (diff) |
Merge pull request #8068 from grpc/revert-7279-grand-unified-closures
Revert "Grand unified closures"
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 63 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 7 | ||||
-rw-r--r-- | src/core/lib/surface/channel_ping.c | 11 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.c | 6 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 69 |
6 files changed, 75 insertions, 83 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 119f5e82ab..772681109a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -109,10 +109,6 @@ typedef struct batch_control { uint8_t recv_message; uint8_t recv_final_op; uint8_t is_notify_tag_closure; - - /* TODO(ctiller): now that this is inlined, figure out how much of the above - state can be eliminated */ - grpc_transport_stream_op op; } batch_control; struct grpc_call { @@ -782,7 +778,6 @@ typedef struct termination_closure { grpc_error *error; grpc_closure *op_closure; enum { TC_CANCEL, TC_CLOSE } type; - grpc_transport_stream_op op; } termination_closure; static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, @@ -802,24 +797,26 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, } static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { + grpc_transport_stream_op op; termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = tc->error; + memset(&op, 0, sizeof(op)); + op.cancel_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - tc->op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &tc->op); + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); } static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { + grpc_transport_stream_op op; termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.close_error = tc->error; + memset(&op, 0, sizeof(op)); + op.close_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - tc->op_closure = tc->op.on_complete; - tc->op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &tc->op); + tc->op_closure = op.on_complete; + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); } static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, @@ -1373,6 +1370,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, int is_notify_tag_closure) { + grpc_transport_stream_op stream_op; size_t i; const grpc_op *op; batch_control *bctl; @@ -1386,6 +1384,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); + memset(&stream_op, 0, sizeof(stream_op)); + /* TODO(ctiller): this feels like it could be made lock-free */ gpr_mu_lock(&call->mu); bctl = allocate_batch_control(call); @@ -1394,9 +1394,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); - grpc_transport_stream_op *stream_op = &bctl->op; - memset(stream_op, 0, sizeof(*stream_op)); - if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); bctl->error = GRPC_ERROR_NONE; @@ -1474,9 +1471,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } /* TODO(ctiller): just make these the same variable? */ call->metadata_batch[0][0].deadline = call->send_deadline; - stream_op->send_initial_metadata = + stream_op.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; - stream_op->send_initial_metadata_flags = op->flags; + stream_op.send_initial_metadata_flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1496,7 +1493,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message->data.raw.slice_buffer, op->flags); - stream_op->send_message = &call->sending_stream.base; + stream_op.send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ @@ -1514,7 +1511,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } bctl->send_final_op = 1; call->sent_final_op = 1; - stream_op->send_trailing_metadata = + stream_op.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: @@ -1561,7 +1558,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - stream_op->send_trailing_metadata = + stream_op.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: @@ -1579,9 +1576,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl); bctl->recv_initial_metadata = 1; - stream_op->recv_initial_metadata = + stream_op.recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - stream_op->recv_initial_metadata_ready = + stream_op.recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; num_completion_callbacks_needed++; break; @@ -1598,10 +1595,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->receiving_message = 1; bctl->recv_message = 1; call->receiving_buffer = op->data.recv_message; - stream_op->recv_message = &call->receiving_stream; + stream_op.recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, bctl); - stream_op->recv_message_ready = &call->receiving_stream_ready; + stream_op.recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: @@ -1627,9 +1624,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.client.status_details_capacity = op->data.recv_status_on_client.status_details_capacity; bctl->recv_final_op = 1; - stream_op->recv_trailing_metadata = + stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op->collect_stats = + stream_op.collect_stats = &call->final_info.stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: @@ -1650,9 +1647,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; bctl->recv_final_op = 1; - stream_op->recv_trailing_metadata = + stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op->collect_stats = + stream_op.collect_stats = &call->final_info.stats.transport_stream_stats; break; } @@ -1664,12 +1661,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); - stream_op->context = call->context; + stream_op.context = call->context; grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); - stream_op->on_complete = &bctl->finish_batch; + stream_op.on_complete = &bctl->finish_batch; gpr_mu_unlock(&call->mu); - execute_op(exec_ctx, call, stream_op); + execute_op(exec_ctx, call, &stream_op); done: GPR_TIMER_END("grpc_call_start_batch", 0); diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 52e78567bd..6d2b1c4935 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -334,13 +334,14 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, } void grpc_channel_destroy(grpc_channel *channel) { - grpc_transport_op *op = grpc_make_transport_op(NULL); + grpc_transport_op op; grpc_channel_element *elem; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); - op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); + memset(&op, 0, sizeof(op)); + op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - elem->filter->start_transport_op(&exec_ctx, elem, op); + elem->filter->start_transport_op(&exec_ctx, elem, &op); GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel"); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 0d2f01a649..9818f9d2f2 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -61,20 +61,19 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, void *tag, void *reserved) { - GRPC_API_TRACE("grpc_channel_ping(channel=%p, cq=%p, tag=%p, reserved=%p)", 4, - (channel, cq, tag, reserved)); - grpc_transport_op *op = grpc_make_transport_op(NULL); + grpc_transport_op op; ping_result *pr = gpr_malloc(sizeof(*pr)); grpc_channel_element *top_elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(reserved == NULL); + memset(&op, 0, sizeof(op)); pr->tag = tag; pr->cq = cq; grpc_closure_init(&pr->closure, ping_done, pr); - op->send_ping = &pr->closure; - op->bind_pollset = grpc_cq_pollset(cq); + op.send_ping = &pr->closure; + op.bind_pollset = grpc_cq_pollset(cq); grpc_cq_begin_op(cq, tag); - top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); + top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index edda0c85fa..5397913a21 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -47,7 +47,6 @@ #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" -#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" @@ -166,7 +165,6 @@ void grpc_init(void) { grpc_register_tracer("http1", &grpc_http1_trace); grpc_register_tracer("compression", &grpc_compression_trace); grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); - grpc_register_tracer("combiner", &grpc_combiner_trace); // Default pluck trace to 1 grpc_cq_pluck_trace = 1; grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index d32c884e8e..19b78369dd 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -97,14 +97,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE, NULL); } + if (op->on_consumed != NULL) { + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + } if (op->send_ping != NULL) { grpc_exec_ctx_sched(exec_ctx, op->send_ping, GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); - if (op->on_consumed != NULL) { - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); - } } static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 56fb80e92e..55e6d99057 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -273,20 +273,23 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - int send_goaway, grpc_error *send_disconnect) { - struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); - grpc_closure_init(&sc->closure, shutdown_cleanup, sc); - grpc_transport_op *op = grpc_make_transport_op(&sc->closure); + bool send_goaway, grpc_error *send_disconnect) { + grpc_transport_op op; + struct shutdown_cleanup_args *sc; grpc_channel_element *elem; - op->send_goaway = send_goaway; + memset(&op, 0, sizeof(op)); + op.send_goaway = send_goaway; + sc = gpr_malloc(sizeof(*sc)); sc->slice = gpr_slice_from_copied_string("Server shutdown"); - op->goaway_message = &sc->slice; - op->goaway_status = GRPC_STATUS_OK; - op->disconnect_with_error = send_disconnect; + op.goaway_message = &sc->slice; + op.goaway_status = GRPC_STATUS_OK; + op.disconnect_with_error = send_disconnect; + grpc_closure_init(&sc->closure, shutdown_cleanup, sc); + op.on_consumed = &sc->closure; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(exec_ctx, elem, op); + elem->filter->start_transport_op(exec_ctx, elem, &op); } static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, @@ -429,8 +432,7 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, server_unref(exec_ctx, server); } -static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_error *error) { +static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != NULL); orphan_channel(chand); @@ -439,20 +441,14 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_transport_op *op = - grpc_make_transport_op(&chand->finish_destroy_channel_closure); - op->set_accept_stream = true; + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.set_accept_stream = true; + op.on_consumed = &chand->finish_destroy_channel_closure; grpc_channel_next_op(exec_ctx, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - op); - - if (error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(error); - gpr_log(GPR_INFO, "Disconnected client: %s", msg); - grpc_error_free_string(msg); - } - GRPC_ERROR_UNREF(error); + &op); } static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { @@ -849,16 +845,17 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_transport_op *op = grpc_make_transport_op(NULL); - op->on_connectivity_state_change = &chand->channel_connectivity_changed, - op->connectivity_state = &chand->connectivity_state; + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.on_connectivity_state_change = &chand->channel_connectivity_changed, + op.connectivity_state = &chand->connectivity_state; grpc_channel_next_op(exec_ctx, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - op); + &op); } else { gpr_mu_lock(&server->mu_global); - destroy_channel(exec_ctx, chand, GRPC_ERROR_REF(error)); + destroy_channel(exec_ctx, chand); gpr_mu_unlock(&server->mu_global); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity"); } @@ -1122,7 +1119,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, size_t slots; uint32_t probes; uint32_t max_probes = 0; - grpc_transport_op *op = NULL; + grpc_transport_op op; channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); @@ -1182,16 +1179,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, gpr_mu_unlock(&s->mu_global); GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); - op = grpc_make_transport_op(NULL); - op->set_accept_stream = true; - op->set_accept_stream_fn = accept_stream; - op->set_accept_stream_user_data = chand; - op->on_connectivity_state_change = &chand->channel_connectivity_changed; - op->connectivity_state = &chand->connectivity_state; + memset(&op, 0, sizeof(op)); + op.set_accept_stream = true; + op.set_accept_stream_fn = accept_stream; + op.set_accept_stream_user_data = chand; + op.on_connectivity_state_change = &chand->channel_connectivity_changed; + op.connectivity_state = &chand->connectivity_state; if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { - op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); + op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); } - grpc_transport_perform_op(exec_ctx, transport, op); + grpc_transport_perform_op(exec_ctx, transport, &op); } void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, |