aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-07 17:08:42 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-07 17:08:42 -0700
commiteb36437e85dbc35624102a0414ad9166046ba173 (patch)
tree24fe8219be5a701b914837e3f3a0f73a96d17097 /src/core/lib/surface
parentd2f1aa0275bbf7fbb5cc342198baf3cb25e5045f (diff)
parent4c40161d7597644b91cc8d225f09b139c7c7f22b (diff)
Merge github.com:grpc/grpc into c++lame
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c308
-rw-r--r--src/core/lib/surface/channel.c15
-rw-r--r--src/core/lib/surface/completion_queue.c5
-rw-r--r--src/core/lib/surface/init.c4
-rw-r--r--src/core/lib/surface/lame_client.cc20
-rw-r--r--src/core/lib/surface/server.c44
6 files changed, 222 insertions, 174 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index a9317a4694..97d50a91be 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -117,37 +117,56 @@ static received_status unpack_received_status(gpr_atm atm) {
typedef struct batch_control {
grpc_call *call;
- grpc_cq_completion cq_completion;
+ /* Share memory for cq_completion and notify_tag as they are never needed
+ simultaneously. Each byte used in this data structure count as six bytes
+ per call, so any savings we can make are worthwhile,
+
+ We use notify_tag to determine whether or not to send notification to the
+ completion queue. Once we've made that determination, we can reuse the
+ memory for cq_completion. */
+ union {
+ grpc_cq_completion cq_completion;
+ struct {
+ /* Any given op indicates completion by either (a) calling a closure or
+ (b) sending a notification on the call's completion queue. If
+ \a is_closure is true, \a tag indicates a closure to be invoked;
+ otherwise, \a tag indicates the tag to be used in the notification to
+ be sent to the completion queue. */
+ void *tag;
+ bool is_closure;
+ } notify_tag;
+ } completion_data;
grpc_closure finish_batch;
- void *notify_tag;
gpr_refcount steps_to_complete;
grpc_error *errors[MAX_ERRORS_PER_BATCH];
gpr_atm num_errors;
- uint8_t send_initial_metadata;
- uint8_t send_message;
- uint8_t send_final_op;
- uint8_t recv_initial_metadata;
- uint8_t recv_message;
- uint8_t recv_final_op;
- uint8_t is_notify_tag_closure;
-
- /* TODO(ctiller): now that this is inlined, figure out how much of the above
- state can be eliminated */
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
} batch_control;
+typedef struct {
+ gpr_mu child_list_mu;
+ grpc_call *first_child;
+} parent_call;
+
+typedef struct {
+ grpc_call *parent;
+ /** siblings: children of the same parent form a list, and this list is
+ protected under
+ parent->mu */
+ grpc_call *sibling_next;
+ grpc_call *sibling_prev;
+} child_call;
+
struct grpc_call {
gpr_arena *arena;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
- grpc_call *parent;
- grpc_call *first_child;
gpr_timespec start_time;
- /* protects first_child, and child next/prev links */
- gpr_mu child_list_mu;
+ /* parent_call* */ gpr_atm parent_call_atm;
+ child_call *child_call;
/* client or server call */
bool is_client;
@@ -168,7 +187,8 @@ struct grpc_call {
/* have we received initial metadata */
bool has_initial_md_been_received;
- batch_control active_batches[MAX_CONCURRENT_BATCHES];
+ batch_control *active_batches[MAX_CONCURRENT_BATCHES];
+ grpc_transport_stream_op_batch_payload stream_op_payload;
/* first idx: is_receiving, second idx: is_trailing */
grpc_metadata_batch metadata_batch[2][2];
@@ -198,12 +218,6 @@ struct grpc_call {
int send_extra_metadata_count;
gpr_timespec send_deadline;
- /** siblings: children of the same parent form a list, and this list is
- protected under
- parent->mu */
- grpc_call *sibling_next;
- grpc_call *sibling_prev;
-
grpc_slice_buffer_stream sending_stream;
grpc_byte_stream *receiving_stream;
@@ -239,7 +253,7 @@ int grpc_call_error_trace = 0;
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op *op);
+ grpc_transport_stream_op_batch *op);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -268,6 +282,23 @@ static void add_init_error(grpc_error **composite, grpc_error *new) {
*composite = grpc_error_add_child(*composite, new);
}
+static parent_call *get_or_create_parent_call(grpc_call *call) {
+ parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+ if (p == NULL) {
+ p = gpr_arena_alloc(call->arena, sizeof(*p));
+ gpr_mu_init(&p->child_list_mu);
+ if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
+ gpr_mu_destroy(&p->child_list_mu);
+ p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+ }
+ }
+ return p;
+}
+
+static parent_call *get_parent_call(grpc_call *call) {
+ return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+}
+
grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
const grpc_call_create_args *args,
grpc_call **out_call) {
@@ -283,14 +314,13 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
call->arena = arena;
*out_call = call;
- gpr_mu_init(&call->child_list_mu);
call->channel = args->channel;
call->cq = args->cq;
- call->parent = args->parent_call;
call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == NULL;
+ call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
GPR_ASSERT(args->add_initial_metadata_count <
@@ -317,11 +347,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
if (args->parent_call != NULL) {
+ child_call *cc = call->child_call =
+ gpr_arena_alloc(arena, sizeof(child_call));
+ call->child_call->parent = args->parent_call;
+
GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
GPR_ASSERT(call->is_client);
GPR_ASSERT(!args->parent_call->is_client);
- gpr_mu_lock(&args->parent_call->child_list_mu);
+ parent_call *pc = get_or_create_parent_call(args->parent_call);
+
+ gpr_mu_lock(&pc->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
@@ -355,17 +391,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
}
}
- if (args->parent_call->first_child == NULL) {
- args->parent_call->first_child = call;
- call->sibling_next = call->sibling_prev = call;
+ if (pc->first_child == NULL) {
+ pc->first_child = call;
+ cc->sibling_next = cc->sibling_prev = call;
} else {
- call->sibling_next = args->parent_call->first_child;
- call->sibling_prev = args->parent_call->first_child->sibling_prev;
- call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
- call;
+ cc->sibling_next = pc->first_child;
+ cc->sibling_prev = pc->first_child->child_call->sibling_prev;
+ cc->sibling_next->child_call->sibling_prev =
+ cc->sibling_prev->child_call->sibling_next = call;
}
- gpr_mu_unlock(&args->parent_call->child_list_mu);
+ gpr_mu_unlock(&pc->child_list_mu);
}
call->send_deadline = send_deadline;
@@ -460,7 +496,10 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
- gpr_mu_destroy(&c->child_list_mu);
+ parent_call *pc = get_parent_call(c);
+ if (pc != NULL) {
+ gpr_mu_destroy(&pc->child_list_mu);
+ }
for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
}
@@ -490,31 +529,31 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
}
void grpc_call_destroy(grpc_call *c) {
- int cancel;
- grpc_call *parent = c->parent;
+ child_call *cc = c->child_call;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_call_destroy", 0);
GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
- if (parent) {
- gpr_mu_lock(&parent->child_list_mu);
- if (c == parent->first_child) {
- parent->first_child = c->sibling_next;
- if (c == parent->first_child) {
- parent->first_child = NULL;
+ if (cc) {
+ parent_call *pc = get_parent_call(cc->parent);
+ gpr_mu_lock(&pc->child_list_mu);
+ if (c == pc->first_child) {
+ pc->first_child = cc->sibling_next;
+ if (c == pc->first_child) {
+ pc->first_child = NULL;
}
}
- c->sibling_prev->sibling_next = c->sibling_next;
- c->sibling_next->sibling_prev = c->sibling_prev;
- gpr_mu_unlock(&parent->child_list_mu);
- GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
+ cc->sibling_prev->child_call->sibling_next = cc->sibling_next;
+ cc->sibling_next->child_call->sibling_prev = cc->sibling_prev;
+ gpr_mu_unlock(&pc->child_list_mu);
+ GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
}
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) &&
- !gpr_atm_acq_load(&c->received_final_op_atm);
+ bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
+ gpr_atm_acq_load(&c->received_final_op_atm) == 0;
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
@@ -535,13 +574,12 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
}
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
grpc_call_element *elem;
GPR_TIMER_BEGIN("execute_op", 0);
elem = CALL_ELEM_FROM_CALL(call, 0);
- op->context = call->context;
- elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
GPR_TIMER_END("execute_op", 0);
}
@@ -594,9 +632,10 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- grpc_transport_stream_op *op = grpc_make_transport_stream_op(
+ grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx));
- op->cancel_error = error;
+ op->cancel_stream = true;
+ op->payload->cancel_stream.cancel_error = error;
execute_op(exec_ctx, c, op);
}
@@ -1025,16 +1064,17 @@ static batch_control *allocate_batch_control(grpc_call *call,
const grpc_op *ops,
size_t num_ops) {
int slot = batch_slot_for_op(ops[0].op);
- for (size_t i = 1; i < num_ops; i++) {
- int op_slot = batch_slot_for_op(ops[i].op);
- slot = GPR_MIN(slot, op_slot);
+ batch_control **pslot = &call->active_batches[slot];
+ if (*pslot == NULL) {
+ *pslot = gpr_arena_alloc(call->arena, sizeof(batch_control));
}
- batch_control *bctl = &call->active_batches[slot];
+ batch_control *bctl = *pslot;
if (bctl->call != NULL) {
return NULL;
}
memset(bctl, 0, sizeof(*bctl));
bctl->call = call;
+ bctl->op.payload = &call->stream_op_payload;
return bctl;
}
@@ -1069,46 +1109,49 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) {
static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
- grpc_call *child_call;
grpc_call *next_child_call;
grpc_call *call = bctl->call;
grpc_error *error = consolidate_batch_errors(bctl);
- if (bctl->send_initial_metadata) {
+ if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
- if (bctl->send_message) {
+ if (bctl->op.send_message) {
call->sending_message = false;
}
- if (bctl->send_final_op) {
+ if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
exec_ctx,
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
- if (bctl->recv_final_op) {
+ if (bctl->op.recv_trailing_metadata) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(exec_ctx, call, md);
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
- gpr_mu_lock(&call->child_list_mu);
- child_call = call->first_child;
- if (child_call != NULL) {
- do {
- next_child_call = child_call->sibling_next;
- if (child_call->cancellation_is_inherited) {
- GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
- cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE,
- GRPC_ERROR_CANCELLED);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
- }
- child_call = next_child_call;
- } while (child_call != call->first_child);
+ parent_call *pc = get_parent_call(call);
+ if (pc != NULL) {
+ grpc_call *child;
+ gpr_mu_lock(&pc->child_list_mu);
+ child = pc->first_child;
+ if (child != NULL) {
+ do {
+ next_child_call = child->child_call->sibling_next;
+ if (child->cancellation_is_inherited) {
+ GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
+ cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel");
+ }
+ child = next_child_call;
+ } while (child != pc->first_child);
+ }
+ gpr_mu_unlock(&pc->child_list_mu);
}
- gpr_mu_unlock(&call->child_list_mu);
if (call->is_client) {
get_final_status(call, set_status_value_directly,
@@ -1123,15 +1166,16 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
error = GRPC_ERROR_NONE;
}
- if (bctl->is_notify_tag_closure) {
+ if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = NULL;
- grpc_closure_run(exec_ctx, bctl->notify_tag, error);
+ grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
- grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error,
- finish_batch_completion, bctl, &bctl->cq_completion);
+ grpc_cq_end_op(
+ exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
+ finish_batch_completion, bctl, &bctl->completion_data.cq_completion);
}
}
@@ -1374,11 +1418,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (bctl == NULL) {
return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
}
- bctl->notify_tag = notify_tag;
- bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
+ bctl->completion_data.notify_tag.tag = notify_tag;
+ bctl->completion_data.notify_tag.is_closure =
+ (uint8_t)(is_notify_tag_closure != 0);
- grpc_transport_stream_op *stream_op = &bctl->op;
- memset(stream_op, 0, sizeof(*stream_op));
+ grpc_transport_stream_op_batch *stream_op = &bctl->op;
+ grpc_transport_stream_op_batch_payload *stream_op_payload =
+ &call->stream_op_payload;
stream_op->covered_by_poller = true;
/* rewrite batch ops into a transport op */
@@ -1432,8 +1478,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- bctl->send_initial_metadata = 1;
- call->sent_initial_metadata = 1;
+ stream_op->send_initial_metadata = true;
+ call->sent_initial_metadata = true;
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
@@ -1443,9 +1489,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
/* TODO(ctiller): just make these the same variable? */
call->metadata_batch[0][0].deadline = call->send_deadline;
- stream_op->send_initial_metadata =
+ stream_op_payload->send_initial_metadata.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op->send_initial_metadata_flags = op->flags;
+ stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
+ op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1460,8 +1507,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- bctl->send_message = 1;
- call->sending_message = 1;
+ stream_op->send_message = true;
+ call->sending_message = true;
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message.send_message->data.raw.slice_buffer,
@@ -1473,7 +1520,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_COMPRESS_NONE) {
call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- stream_op->send_message = &call->sending_stream.base;
+ stream_op_payload->send_message.send_message =
+ &call->sending_stream.base;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
@@ -1489,9 +1537,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- bctl->send_final_op = 1;
- call->sent_final_op = 1;
- stream_op->send_trailing_metadata =
+ stream_op->send_trailing_metadata = true;
+ call->sent_final_op = true;
+ stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
@@ -1513,8 +1561,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- bctl->send_final_op = 1;
- call->sent_final_op = 1;
+ stream_op->send_trailing_metadata = true;
+ call->sent_final_op = true;
GPR_ASSERT(call->send_extra_metadata_count == 0);
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
@@ -1553,7 +1601,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- stream_op->send_trailing_metadata =
+ stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1570,16 +1618,16 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
from server.c. In that case, it's coming from accept_stream, and in
that case we're not necessarily covered by a poller. */
stream_op->covered_by_poller = call->is_client;
- call->received_initial_metadata = 1;
+ call->received_initial_metadata = true;
call->buffered_metadata[0] =
op->data.recv_initial_metadata.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
- bctl->recv_initial_metadata = 1;
- stream_op->recv_initial_metadata =
+ stream_op->recv_initial_metadata = true;
+ stream_op_payload->recv_initial_metadata.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- stream_op->recv_initial_metadata_ready =
+ stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
num_completion_callbacks_needed++;
break;
@@ -1593,13 +1641,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->receiving_message = 1;
- bctl->recv_message = 1;
+ call->receiving_message = true;
+ stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
- stream_op->recv_message = &call->receiving_stream;
+ stream_op_payload->recv_message.recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
bctl, grpc_schedule_on_exec_ctx);
- stream_op->recv_message_ready = &call->receiving_stream_ready;
+ stream_op_payload->recv_message.recv_message_ready =
+ &call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
@@ -1616,16 +1665,17 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->requested_final_op = 1;
+ call->requested_final_op = true;
call->buffered_metadata[1] =
op->data.recv_status_on_client.trailing_metadata;
call->final_op.client.status = op->data.recv_status_on_client.status;
call->final_op.client.status_details =
op->data.recv_status_on_client.status_details;
- bctl->recv_final_op = 1;
- stream_op->recv_trailing_metadata =
+ stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op->collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
@@ -1642,13 +1692,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->requested_final_op = 1;
+ call->requested_final_op = true;
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
- bctl->recv_final_op = 1;
- stream_op->recv_trailing_metadata =
+ stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op->collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
}
@@ -1660,7 +1711,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
- stream_op->context = call->context;
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
@@ -1674,26 +1724,26 @@ done:
done_with_error:
/* reverse any mutations that occured */
- if (bctl->send_initial_metadata) {
- call->sent_initial_metadata = 0;
+ if (stream_op->send_initial_metadata) {
+ call->sent_initial_metadata = false;
grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]);
}
- if (bctl->send_message) {
- call->sending_message = 0;
+ if (stream_op->send_message) {
+ call->sending_message = false;
grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
}
- if (bctl->send_final_op) {
- call->sent_final_op = 0;
+ if (stream_op->send_trailing_metadata) {
+ call->sent_final_op = false;
grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]);
}
- if (bctl->recv_initial_metadata) {
- call->received_initial_metadata = 0;
+ if (stream_op->recv_initial_metadata) {
+ call->received_initial_metadata = false;
}
- if (bctl->recv_message) {
- call->receiving_message = 0;
+ if (stream_op->recv_message) {
+ call->receiving_message = false;
}
- if (bctl->recv_final_op) {
- call->requested_final_op = 0;
+ if (stream_op->recv_trailing_metadata) {
+ call->requested_final_op = false;
}
goto done;
}
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index b4bfb92042..b3ba826bbc 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -150,17 +150,20 @@ grpc_channel *grpc_channel_create_with_builder(
} else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) {
channel->compression_options.default_level.is_set = true;
- GPR_ASSERT(args->args[i].value.integer >= 0 &&
- args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT);
channel->compression_options.default_level.level =
- (grpc_compression_level)args->args[i].value.integer;
+ (grpc_compression_level)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){GRPC_COMPRESS_LEVEL_NONE,
+ GRPC_COMPRESS_LEVEL_NONE,
+ GRPC_COMPRESS_LEVEL_COUNT - 1});
} else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
channel->compression_options.default_algorithm.is_set = true;
- GPR_ASSERT(args->args[i].value.integer >= 0 &&
- args->args[i].value.integer < GRPC_COMPRESS_ALGORITHMS_COUNT);
channel->compression_options.default_algorithm.algorithm =
- (grpc_compression_algorithm)args->args[i].value.integer;
+ (grpc_compression_algorithm)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
+ GRPC_COMPRESS_ALGORITHMS_COUNT - 1});
} else if (0 ==
strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index b4594817e4..3273addf1d 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -345,7 +345,6 @@ static void dump_pending_tags(grpc_completion_queue *cc) {}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
- grpc_pollset_worker *worker = NULL;
gpr_timespec now;
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -426,8 +425,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu);
continue;
} else {
- grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
- &worker, now, iteration_deadline);
+ grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL,
+ now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err);
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index b46ecac18d..91bd014a0e 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -47,7 +47,6 @@
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/channel/max_age_filter.h"
#include "src/core/lib/channel/message_size_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
@@ -115,9 +114,6 @@ static void register_builtin_channel_init() {
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
(void *)&grpc_server_deadline_filter);
grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_max_age_filter);
- grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_message_size_filter);
grpc_channel_init_register_stage(
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index 251b24d670..4616fc5830 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -90,15 +90,17 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
-static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- if (op->recv_initial_metadata != NULL) {
- fill_metadata(exec_ctx, elem, op->recv_initial_metadata);
- } else if (op->recv_trailing_metadata != NULL) {
- fill_metadata(exec_ctx, elem, op->recv_trailing_metadata);
+static void lame_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ if (op->recv_initial_metadata) {
+ fill_metadata(exec_ctx, elem,
+ op->payload->recv_initial_metadata.recv_initial_metadata);
+ } else if (op->recv_trailing_metadata) {
+ fill_metadata(exec_ctx, elem,
+ op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
- grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
}
@@ -159,7 +161,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
} // namespace grpc_core
extern "C" const grpc_channel_filter grpc_lame_filter = {
- grpc_core::lame_start_transport_stream_op,
+ grpc_core::lame_start_transport_stream_op_batch,
grpc_core::lame_start_transport_op,
sizeof(grpc_core::CallData),
grpc_core::init_call_elem,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index a123c9ca43..191ee75252 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -154,8 +154,7 @@ struct call_data {
grpc_completion_queue *cq_new;
grpc_metadata_batch *recv_initial_metadata;
- bool recv_idempotent_request;
- bool recv_cacheable_request;
+ uint32_t recv_initial_metadata_flags;
grpc_metadata_array initial_metadata;
request_matcher *request_matcher;
@@ -498,13 +497,7 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
rc->data.batch.details->deadline = calld->deadline;
- rc->data.batch.details->flags =
- (calld->recv_idempotent_request
- ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
- : 0) |
- (calld->recv_cacheable_request
- ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST
- : 0);
+ rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
@@ -632,7 +625,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
if (!grpc_slice_eq(rm->host, calld->host)) continue;
if (!grpc_slice_eq(rm->method, calld->path)) continue;
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
- !calld->recv_idempotent_request) {
+ 0 == (calld->recv_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
finish_start_new_rpc(exec_ctx, server, elem,
@@ -649,7 +643,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
if (rm->has_host) continue;
if (!grpc_slice_eq(rm->method, calld->path)) continue;
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
- !calld->recv_idempotent_request) {
+ 0 == (calld->recv_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
finish_start_new_rpc(exec_ctx, server, elem,
@@ -781,22 +776,25 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
}
static void server_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
- if (op->recv_initial_metadata != NULL) {
- GPR_ASSERT(op->recv_idempotent_request == NULL);
- calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
- op->recv_idempotent_request = &calld->recv_idempotent_request;
- op->recv_cacheable_request = &calld->recv_cacheable_request;
+ if (op->recv_initial_metadata) {
+ GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == NULL);
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->on_done_recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->server_on_recv_initial_metadata;
+ op->payload->recv_initial_metadata.recv_flags =
+ &calld->recv_initial_metadata_flags;
}
}
-static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void server_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
@@ -960,7 +958,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_server_top_filter = {
- server_start_transport_stream_op,
+ server_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,