aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2016-09-12 11:59:25 -0700
committerGravatar GitHub <noreply@github.com>2016-09-12 11:59:25 -0700
commit6e51f992c6bfdfba61d984ab173305da455bd2e7 (patch)
tree0ca276fbe0dac5c2be06d8ba74ac2193cdd3f550 /src/core/lib/surface
parentec5c93cabfbf535be2528df55ca8bb4500e6bc9b (diff)
parent537f7c2a136641487febeac89a25e430029eb40c (diff)
Merge pull request #8068 from grpc/revert-7279-grand-unified-closures
Revert "Grand unified closures"
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c63
-rw-r--r--src/core/lib/surface/channel.c7
-rw-r--r--src/core/lib/surface/channel_ping.c11
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/surface/lame_client.c6
-rw-r--r--src/core/lib/surface/server.c69
6 files changed, 75 insertions, 83 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 119f5e82ab..772681109a 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -109,10 +109,6 @@ typedef struct batch_control {
uint8_t recv_message;
uint8_t recv_final_op;
uint8_t is_notify_tag_closure;
-
- /* TODO(ctiller): now that this is inlined, figure out how much of the above
- state can be eliminated */
- grpc_transport_stream_op op;
} batch_control;
struct grpc_call {
@@ -782,7 +778,6 @@ typedef struct termination_closure {
grpc_error *error;
grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
- grpc_transport_stream_op op;
} termination_closure;
static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
@@ -802,24 +797,26 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
}
static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
+ grpc_transport_stream_op op;
termination_closure *tc = tcp;
- memset(&tc->op, 0, sizeof(tc->op));
- tc->op.cancel_error = tc->error;
+ memset(&op, 0, sizeof(op));
+ op.cancel_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op.on_complete = &tc->closure;
- execute_op(exec_ctx, tc->call, &tc->op);
+ op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &op);
}
static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
+ grpc_transport_stream_op op;
termination_closure *tc = tcp;
- memset(&tc->op, 0, sizeof(tc->op));
- tc->op.close_error = tc->error;
+ memset(&op, 0, sizeof(op));
+ op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op_closure = tc->op.on_complete;
- tc->op.on_complete = &tc->closure;
- execute_op(exec_ctx, tc->call, &tc->op);
+ tc->op_closure = op.on_complete;
+ op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &op);
}
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
@@ -1373,6 +1370,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
int is_notify_tag_closure) {
+ grpc_transport_stream_op stream_op;
size_t i;
const grpc_op *op;
batch_control *bctl;
@@ -1386,6 +1384,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
+ memset(&stream_op, 0, sizeof(stream_op));
+
/* TODO(ctiller): this feels like it could be made lock-free */
gpr_mu_lock(&call->mu);
bctl = allocate_batch_control(call);
@@ -1394,9 +1394,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->notify_tag = notify_tag;
bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
- grpc_transport_stream_op *stream_op = &bctl->op;
- memset(stream_op, 0, sizeof(*stream_op));
-
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");
bctl->error = GRPC_ERROR_NONE;
@@ -1474,9 +1471,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
/* TODO(ctiller): just make these the same variable? */
call->metadata_batch[0][0].deadline = call->send_deadline;
- stream_op->send_initial_metadata =
+ stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op->send_initial_metadata_flags = op->flags;
+ stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1496,7 +1493,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message->data.raw.slice_buffer, op->flags);
- stream_op->send_message = &call->sending_stream.base;
+ stream_op.send_message = &call->sending_stream.base;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
@@ -1514,7 +1511,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
bctl->send_final_op = 1;
call->sent_final_op = 1;
- stream_op->send_trailing_metadata =
+ stream_op.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
@@ -1561,7 +1558,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- stream_op->send_trailing_metadata =
+ stream_op.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1579,9 +1576,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl);
bctl->recv_initial_metadata = 1;
- stream_op->recv_initial_metadata =
+ stream_op.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- stream_op->recv_initial_metadata_ready =
+ stream_op.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
num_completion_callbacks_needed++;
break;
@@ -1598,10 +1595,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->receiving_message = 1;
bctl->recv_message = 1;
call->receiving_buffer = op->data.recv_message;
- stream_op->recv_message = &call->receiving_stream;
+ stream_op.recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
bctl);
- stream_op->recv_message_ready = &call->receiving_stream_ready;
+ stream_op.recv_message_ready = &call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
@@ -1627,9 +1624,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.client.status_details_capacity =
op->data.recv_status_on_client.status_details_capacity;
bctl->recv_final_op = 1;
- stream_op->recv_trailing_metadata =
+ stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op->collect_stats =
+ stream_op.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
@@ -1650,9 +1647,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
bctl->recv_final_op = 1;
- stream_op->recv_trailing_metadata =
+ stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op->collect_stats =
+ stream_op.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
}
@@ -1664,12 +1661,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
- stream_op->context = call->context;
+ stream_op.context = call->context;
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
- stream_op->on_complete = &bctl->finish_batch;
+ stream_op.on_complete = &bctl->finish_batch;
gpr_mu_unlock(&call->mu);
- execute_op(exec_ctx, call, stream_op);
+ execute_op(exec_ctx, call, &stream_op);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 52e78567bd..6d2b1c4935 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -334,13 +334,14 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
}
void grpc_channel_destroy(grpc_channel *channel) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op op;
grpc_channel_element *elem;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
- op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
+ memset(&op, 0, sizeof(op));
+ op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->start_transport_op(&exec_ctx, elem, op);
+ elem->filter->start_transport_op(&exec_ctx, elem, &op);
GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel");
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 0d2f01a649..9818f9d2f2 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -61,20 +61,19 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
void *tag, void *reserved) {
- GRPC_API_TRACE("grpc_channel_ping(channel=%p, cq=%p, tag=%p, reserved=%p)", 4,
- (channel, cq, tag, reserved));
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op op;
ping_result *pr = gpr_malloc(sizeof(*pr));
grpc_channel_element *top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(reserved == NULL);
+ memset(&op, 0, sizeof(op));
pr->tag = tag;
pr->cq = cq;
grpc_closure_init(&pr->closure, ping_done, pr);
- op->send_ping = &pr->closure;
- op->bind_pollset = grpc_cq_pollset(cq);
+ op.send_ping = &pr->closure;
+ op.bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
- top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
+ top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index edda0c85fa..5397913a21 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -47,7 +47,6 @@
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
-#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
@@ -166,7 +165,6 @@ void grpc_init(void) {
grpc_register_tracer("http1", &grpc_http1_trace);
grpc_register_tracer("compression", &grpc_compression_trace);
grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
- grpc_register_tracer("combiner", &grpc_combiner_trace);
// Default pluck trace to 1
grpc_cq_pluck_trace = 1;
grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace);
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index d32c884e8e..19b78369dd 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -97,14 +97,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change,
GRPC_ERROR_NONE, NULL);
}
+ if (op->on_consumed != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+ }
if (op->send_ping != NULL) {
grpc_exec_ctx_sched(exec_ctx, op->send_ping,
GRPC_ERROR_CREATE("lame client channel"), NULL);
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
- if (op->on_consumed != NULL) {
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
- }
}
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 56fb80e92e..55e6d99057 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -273,20 +273,23 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
}
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
- int send_goaway, grpc_error *send_disconnect) {
- struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
- grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
- grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
+ bool send_goaway, grpc_error *send_disconnect) {
+ grpc_transport_op op;
+ struct shutdown_cleanup_args *sc;
grpc_channel_element *elem;
- op->send_goaway = send_goaway;
+ memset(&op, 0, sizeof(op));
+ op.send_goaway = send_goaway;
+ sc = gpr_malloc(sizeof(*sc));
sc->slice = gpr_slice_from_copied_string("Server shutdown");
- op->goaway_message = &sc->slice;
- op->goaway_status = GRPC_STATUS_OK;
- op->disconnect_with_error = send_disconnect;
+ op.goaway_message = &sc->slice;
+ op.goaway_status = GRPC_STATUS_OK;
+ op.disconnect_with_error = send_disconnect;
+ grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
+ op.on_consumed = &sc->closure;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
}
static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
@@ -429,8 +432,7 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
server_unref(exec_ctx, server);
}
-static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
- grpc_error *error) {
+static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
@@ -439,20 +441,14 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_transport_op *op =
- grpc_make_transport_op(&chand->finish_destroy_channel_closure);
- op->set_accept_stream = true;
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.set_accept_stream = true;
+ op.on_consumed = &chand->finish_destroy_channel_closure;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- op);
-
- if (error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "Disconnected client: %s", msg);
- grpc_error_free_string(msg);
- }
- GRPC_ERROR_UNREF(error);
+ &op);
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
@@ -849,16 +845,17 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
- op->on_connectivity_state_change = &chand->channel_connectivity_changed,
- op->connectivity_state = &chand->connectivity_state;
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op.connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- op);
+ &op);
} else {
gpr_mu_lock(&server->mu_global);
- destroy_channel(exec_ctx, chand, GRPC_ERROR_REF(error));
+ destroy_channel(exec_ctx, chand);
gpr_mu_unlock(&server->mu_global);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity");
}
@@ -1122,7 +1119,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
size_t slots;
uint32_t probes;
uint32_t max_probes = 0;
- grpc_transport_op *op = NULL;
+ grpc_transport_op op;
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
@@ -1182,16 +1179,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
gpr_mu_unlock(&s->mu_global);
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
- op = grpc_make_transport_op(NULL);
- op->set_accept_stream = true;
- op->set_accept_stream_fn = accept_stream;
- op->set_accept_stream_user_data = chand;
- op->on_connectivity_state_change = &chand->channel_connectivity_changed;
- op->connectivity_state = &chand->connectivity_state;
+ memset(&op, 0, sizeof(op));
+ op.set_accept_stream = true;
+ op.set_accept_stream_fn = accept_stream;
+ op.set_accept_stream_user_data = chand;
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op.connectivity_state = &chand->connectivity_state;
if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
- op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
+ op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
}
- grpc_transport_perform_op(exec_ctx, transport, op);
+ grpc_transport_perform_op(exec_ctx, transport, &op);
}
void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,