diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 63 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 7 | ||||
-rw-r--r-- | src/core/lib/surface/channel_ping.c | 11 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.c | 6 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 69 |
6 files changed, 83 insertions, 75 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 772681109a..119f5e82ab 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -109,6 +109,10 @@ typedef struct batch_control { uint8_t recv_message; uint8_t recv_final_op; uint8_t is_notify_tag_closure; + + /* TODO(ctiller): now that this is inlined, figure out how much of the above + state can be eliminated */ + grpc_transport_stream_op op; } batch_control; struct grpc_call { @@ -778,6 +782,7 @@ typedef struct termination_closure { grpc_error *error; grpc_closure *op_closure; enum { TC_CANCEL, TC_CLOSE } type; + grpc_transport_stream_op op; } termination_closure; static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, @@ -797,26 +802,24 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, } static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { - grpc_transport_stream_op op; termination_closure *tc = tcp; - memset(&op, 0, sizeof(op)); - op.cancel_error = tc->error; + memset(&tc->op, 0, sizeof(tc->op)); + tc->op.cancel_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &op); + tc->op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &tc->op); } static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { - grpc_transport_stream_op op; termination_closure *tc = tcp; - memset(&op, 0, sizeof(op)); - op.close_error = tc->error; + memset(&tc->op, 0, sizeof(tc->op)); + tc->op.close_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - tc->op_closure = op.on_complete; - op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &op); + tc->op_closure = tc->op.on_complete; + tc->op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &tc->op); } static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, @@ -1370,7 +1373,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, int is_notify_tag_closure) { - grpc_transport_stream_op stream_op; size_t i; const grpc_op *op; batch_control *bctl; @@ -1384,8 +1386,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); - memset(&stream_op, 0, sizeof(stream_op)); - /* TODO(ctiller): this feels like it could be made lock-free */ gpr_mu_lock(&call->mu); bctl = allocate_batch_control(call); @@ -1394,6 +1394,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); + grpc_transport_stream_op *stream_op = &bctl->op; + memset(stream_op, 0, sizeof(*stream_op)); + if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); bctl->error = GRPC_ERROR_NONE; @@ -1471,9 +1474,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } /* TODO(ctiller): just make these the same variable? */ call->metadata_batch[0][0].deadline = call->send_deadline; - stream_op.send_initial_metadata = + stream_op->send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; - stream_op.send_initial_metadata_flags = op->flags; + stream_op->send_initial_metadata_flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1493,7 +1496,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message->data.raw.slice_buffer, op->flags); - stream_op.send_message = &call->sending_stream.base; + stream_op->send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ @@ -1511,7 +1514,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } bctl->send_final_op = 1; call->sent_final_op = 1; - stream_op.send_trailing_metadata = + stream_op->send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: @@ -1558,7 +1561,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - stream_op.send_trailing_metadata = + stream_op->send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: @@ -1576,9 +1579,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl); bctl->recv_initial_metadata = 1; - stream_op.recv_initial_metadata = + stream_op->recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - stream_op.recv_initial_metadata_ready = + stream_op->recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; num_completion_callbacks_needed++; break; @@ -1595,10 +1598,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->receiving_message = 1; bctl->recv_message = 1; call->receiving_buffer = op->data.recv_message; - stream_op.recv_message = &call->receiving_stream; + stream_op->recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, bctl); - stream_op.recv_message_ready = &call->receiving_stream_ready; + stream_op->recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: @@ -1624,9 +1627,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.client.status_details_capacity = op->data.recv_status_on_client.status_details_capacity; bctl->recv_final_op = 1; - stream_op.recv_trailing_metadata = + stream_op->recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op.collect_stats = + stream_op->collect_stats = &call->final_info.stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: @@ -1647,9 +1650,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; bctl->recv_final_op = 1; - stream_op.recv_trailing_metadata = + stream_op->recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op.collect_stats = + stream_op->collect_stats = &call->final_info.stats.transport_stream_stats; break; } @@ -1661,12 +1664,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); - stream_op.context = call->context; + stream_op->context = call->context; grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); - stream_op.on_complete = &bctl->finish_batch; + stream_op->on_complete = &bctl->finish_batch; gpr_mu_unlock(&call->mu); - execute_op(exec_ctx, call, &stream_op); + execute_op(exec_ctx, call, stream_op); done: GPR_TIMER_END("grpc_call_start_batch", 0); diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 6d2b1c4935..52e78567bd 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -334,14 +334,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, } void grpc_channel_destroy(grpc_channel *channel) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); - memset(&op, 0, sizeof(op)); - op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); + op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - elem->filter->start_transport_op(&exec_ctx, elem, &op); + elem->filter->start_transport_op(&exec_ctx, elem, op); GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel"); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 9818f9d2f2..0d2f01a649 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -61,19 +61,20 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, void *tag, void *reserved) { - grpc_transport_op op; + GRPC_API_TRACE("grpc_channel_ping(channel=%p, cq=%p, tag=%p, reserved=%p)", 4, + (channel, cq, tag, reserved)); + grpc_transport_op *op = grpc_make_transport_op(NULL); ping_result *pr = gpr_malloc(sizeof(*pr)); grpc_channel_element *top_elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(reserved == NULL); - memset(&op, 0, sizeof(op)); pr->tag = tag; pr->cq = cq; grpc_closure_init(&pr->closure, ping_done, pr); - op.send_ping = &pr->closure; - op.bind_pollset = grpc_cq_pollset(cq); + op->send_ping = &pr->closure; + op->bind_pollset = grpc_cq_pollset(cq); grpc_cq_begin_op(cq, tag); - top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op); + top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 5397913a21..edda0c85fa 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -47,6 +47,7 @@ #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" @@ -165,6 +166,7 @@ void grpc_init(void) { grpc_register_tracer("http1", &grpc_http1_trace); grpc_register_tracer("compression", &grpc_compression_trace); grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); + grpc_register_tracer("combiner", &grpc_combiner_trace); // Default pluck trace to 1 grpc_cq_pluck_trace = 1; grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 19b78369dd..d32c884e8e 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -97,14 +97,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE, NULL); } - if (op->on_consumed != NULL) { - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); - } if (op->send_ping != NULL) { grpc_exec_ctx_sched(exec_ctx, op->send_ping, GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); + if (op->on_consumed != NULL) { + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + } } static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 64afcecc07..0827a1e181 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -272,23 +272,20 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - bool send_goaway, grpc_error *send_disconnect) { - grpc_transport_op op; - struct shutdown_cleanup_args *sc; + int send_goaway, grpc_error *send_disconnect) { + struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); + grpc_closure_init(&sc->closure, shutdown_cleanup, sc); + grpc_transport_op *op = grpc_make_transport_op(&sc->closure); grpc_channel_element *elem; - memset(&op, 0, sizeof(op)); - op.send_goaway = send_goaway; - sc = gpr_malloc(sizeof(*sc)); + op->send_goaway = send_goaway; sc->slice = gpr_slice_from_copied_string("Server shutdown"); - op.goaway_message = &sc->slice; - op.goaway_status = GRPC_STATUS_OK; - op.disconnect_with_error = send_disconnect; - grpc_closure_init(&sc->closure, shutdown_cleanup, sc); - op.on_consumed = &sc->closure; + op->goaway_message = &sc->slice; + op->goaway_status = GRPC_STATUS_OK; + op->disconnect_with_error = send_disconnect; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); + elem->filter->start_transport_op(exec_ctx, elem, op); } static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, @@ -431,7 +428,8 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, server_unref(exec_ctx, server); } -static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { +static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_error *error) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != NULL); orphan_channel(chand); @@ -440,14 +438,20 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_transport_op op; - memset(&op, 0, sizeof(op)); - op.set_accept_stream = true; - op.on_consumed = &chand->finish_destroy_channel_closure; + grpc_transport_op *op = + grpc_make_transport_op(&chand->finish_destroy_channel_closure); + op->set_accept_stream = true; grpc_channel_next_op(exec_ctx, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - &op); + op); + + if (error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(error); + gpr_log(GPR_INFO, "Disconnected client: %s", msg); + grpc_error_free_string(msg); + } + GRPC_ERROR_UNREF(error); } static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { @@ -840,17 +844,16 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_transport_op op; - memset(&op, 0, sizeof(op)); - op.on_connectivity_state_change = &chand->channel_connectivity_changed, - op.connectivity_state = &chand->connectivity_state; + grpc_transport_op *op = grpc_make_transport_op(NULL); + op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->connectivity_state = &chand->connectivity_state; grpc_channel_next_op(exec_ctx, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - &op); + op); } else { gpr_mu_lock(&server->mu_global); - destroy_channel(exec_ctx, chand); + destroy_channel(exec_ctx, chand, GRPC_ERROR_REF(error)); gpr_mu_unlock(&server->mu_global); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity"); } @@ -1114,7 +1117,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, size_t slots; uint32_t probes; uint32_t max_probes = 0; - grpc_transport_op op; + grpc_transport_op *op = NULL; channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); @@ -1174,16 +1177,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, gpr_mu_unlock(&s->mu_global); GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); - memset(&op, 0, sizeof(op)); - op.set_accept_stream = true; - op.set_accept_stream_fn = accept_stream; - op.set_accept_stream_user_data = chand; - op.on_connectivity_state_change = &chand->channel_connectivity_changed; - op.connectivity_state = &chand->connectivity_state; + op = grpc_make_transport_op(NULL); + op->set_accept_stream = true; + op->set_accept_stream_fn = accept_stream; + op->set_accept_stream_user_data = chand; + op->on_connectivity_state_change = &chand->channel_connectivity_changed; + op->connectivity_state = &chand->connectivity_state; if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { - op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); + op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); } - grpc_transport_perform_op(exec_ctx, transport, &op); + grpc_transport_perform_op(exec_ctx, transport, op); } void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, |