aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c63
-rw-r--r--src/core/lib/surface/channel.c7
-rw-r--r--src/core/lib/surface/channel_ping.c9
-rw-r--r--src/core/lib/surface/server.c55
4 files changed, 65 insertions, 69 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index fc9df76dc1..36eec253c2 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -109,6 +109,10 @@ typedef struct batch_control {
uint8_t recv_message;
uint8_t recv_final_op;
uint8_t is_notify_tag_closure;
+
+ /* TODO(ctiller): now that this is inlined, figure out how much of the above
+ state can be eliminated */
+ grpc_transport_stream_op op;
} batch_control;
struct grpc_call {
@@ -762,6 +766,7 @@ typedef struct termination_closure {
grpc_error *error;
grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
+ grpc_transport_stream_op op;
} termination_closure;
static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
@@ -781,26 +786,24 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
}
static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
- grpc_transport_stream_op op;
termination_closure *tc = tcp;
- memset(&op, 0, sizeof(op));
- op.cancel_error = tc->error;
+ memset(&tc->op, 0, sizeof(tc->op));
+ tc->op.cancel_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- op.on_complete = &tc->closure;
- execute_op(exec_ctx, tc->call, &op);
+ tc->op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &tc->op);
}
static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
- grpc_transport_stream_op op;
termination_closure *tc = tcp;
- memset(&op, 0, sizeof(op));
- op.close_error = tc->error;
+ memset(&tc->op, 0, sizeof(tc->op));
+ tc->op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op_closure = op.on_complete;
- op.on_complete = &tc->closure;
- execute_op(exec_ctx, tc->call, &op);
+ tc->op_closure = tc->op.on_complete;
+ tc->op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &tc->op);
}
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
@@ -1354,7 +1357,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
int is_notify_tag_closure) {
- grpc_transport_stream_op stream_op;
size_t i;
const grpc_op *op;
batch_control *bctl;
@@ -1365,8 +1367,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
- memset(&stream_op, 0, sizeof(stream_op));
-
/* TODO(ctiller): this feels like it could be made lock-free */
gpr_mu_lock(&call->mu);
bctl = allocate_batch_control(call);
@@ -1375,6 +1375,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->notify_tag = notify_tag;
bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
+ grpc_transport_stream_op *stream_op = &bctl->op;
+ memset(stream_op, 0, sizeof(*stream_op));
+
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");
bctl->error = GRPC_ERROR_NONE;
@@ -1453,9 +1456,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
/* TODO(ctiller): just make these the same variable? */
call->metadata_batch[0][0].deadline = call->send_deadline;
- stream_op.send_initial_metadata =
+ stream_op->send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op.send_initial_metadata_flags = op->flags;
+ stream_op->send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1475,7 +1478,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message->data.raw.slice_buffer, op->flags);
- stream_op.send_message = &call->sending_stream.base;
+ stream_op->send_message = &call->sending_stream.base;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
@@ -1493,7 +1496,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
bctl->send_final_op = 1;
call->sent_final_op = 1;
- stream_op.send_trailing_metadata =
+ stream_op->send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
@@ -1540,7 +1543,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- stream_op.send_trailing_metadata =
+ stream_op->send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1558,9 +1561,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl);
bctl->recv_initial_metadata = 1;
- stream_op.recv_initial_metadata =
+ stream_op->recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- stream_op.recv_initial_metadata_ready =
+ stream_op->recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
num_completion_callbacks_needed++;
break;
@@ -1577,10 +1580,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->receiving_message = 1;
bctl->recv_message = 1;
call->receiving_buffer = op->data.recv_message;
- stream_op.recv_message = &call->receiving_stream;
+ stream_op->recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
bctl);
- stream_op.recv_message_ready = &call->receiving_stream_ready;
+ stream_op->recv_message_ready = &call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
@@ -1606,9 +1609,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.client.status_details_capacity =
op->data.recv_status_on_client.status_details_capacity;
bctl->recv_final_op = 1;
- stream_op.recv_trailing_metadata =
+ stream_op->recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op.collect_stats = &call->stats.transport_stream_stats;
+ stream_op->collect_stats = &call->stats.transport_stream_stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
/* Flag validation: currently allow no flags */
@@ -1628,9 +1631,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
bctl->recv_final_op = 1;
- stream_op.recv_trailing_metadata =
+ stream_op->recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op.collect_stats = &call->stats.transport_stream_stats;
+ stream_op->collect_stats = &call->stats.transport_stream_stats;
break;
}
}
@@ -1641,12 +1644,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
- stream_op.context = call->context;
+ stream_op->context = call->context;
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
- stream_op.on_complete = &bctl->finish_batch;
+ stream_op->on_complete = &bctl->finish_batch;
gpr_mu_unlock(&call->mu);
- execute_op(exec_ctx, call, &stream_op);
+ execute_op(exec_ctx, call, stream_op);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 6d2b1c4935..52e78567bd 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -334,14 +334,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
}
void grpc_channel_destroy(grpc_channel *channel) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
- memset(&op, 0, sizeof(op));
- op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
+ op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->start_transport_op(&exec_ctx, elem, &op);
+ elem->filter->start_transport_op(&exec_ctx, elem, op);
GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel");
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 9818f9d2f2..5f9f07f89a 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -61,19 +61,18 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
void *tag, void *reserved) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
ping_result *pr = gpr_malloc(sizeof(*pr));
grpc_channel_element *top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(reserved == NULL);
- memset(&op, 0, sizeof(op));
pr->tag = tag;
pr->cq = cq;
grpc_closure_init(&pr->closure, ping_done, pr);
- op.send_ping = &pr->closure;
- op.bind_pollset = grpc_cq_pollset(cq);
+ op->send_ping = &pr->closure;
+ op->bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
- top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
+ top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index d4421be936..2641604901 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -273,22 +273,19 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
int send_goaway, grpc_error *send_disconnect) {
- grpc_transport_op op;
- struct shutdown_cleanup_args *sc;
+ struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
+ grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
+ grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.send_goaway = send_goaway;
- sc = gpr_malloc(sizeof(*sc));
+ op->send_goaway = send_goaway;
sc->slice = gpr_slice_from_copied_string("Server shutdown");
- op.goaway_message = &sc->slice;
- op.goaway_status = GRPC_STATUS_OK;
- op.disconnect_with_error = send_disconnect;
- grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
- op.on_consumed = &sc->closure;
+ op->goaway_message = &sc->slice;
+ op->goaway_status = GRPC_STATUS_OK;
+ op->disconnect_with_error = send_disconnect;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
@@ -440,14 +437,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_transport_op op;
- memset(&op, 0, sizeof(op));
- op.set_accept_stream = true;
- op.on_consumed = &chand->finish_destroy_channel_closure;
+ grpc_transport_op *op =
+ grpc_make_transport_op(&chand->finish_destroy_channel_closure);
+ op->set_accept_stream = true;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- &op);
+ op);
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
@@ -840,14 +836,13 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_transport_op op;
- memset(&op, 0, sizeof(op));
- op.on_connectivity_state_change = &chand->channel_connectivity_changed,
- op.connectivity_state = &chand->connectivity_state;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op->connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- &op);
+ op);
} else {
gpr_mu_lock(&server->mu_global);
destroy_channel(exec_ctx, chand);
@@ -1111,7 +1106,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
size_t slots;
uint32_t probes;
uint32_t max_probes = 0;
- grpc_transport_op op;
+ grpc_transport_op *op = NULL;
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
@@ -1171,16 +1166,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
gpr_mu_unlock(&s->mu_global);
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
- memset(&op, 0, sizeof(op));
- op.set_accept_stream = true;
- op.set_accept_stream_fn = accept_stream;
- op.set_accept_stream_user_data = chand;
- op.on_connectivity_state_change = &chand->channel_connectivity_changed;
- op.connectivity_state = &chand->connectivity_state;
+ op = grpc_make_transport_op(NULL);
+ op->set_accept_stream = true;
+ op->set_accept_stream_fn = accept_stream;
+ op->set_accept_stream_user_data = chand;
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
- op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
+ op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
}
- grpc_transport_perform_op(exec_ctx, transport, &op);
+ grpc_transport_perform_op(exec_ctx, transport, op);
}
void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,