aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-02-17 14:25:12 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-02-17 14:25:12 -0800
commitc0ce00fd164b3e40bb2c1046a452b4c12e295578 (patch)
tree721f298cd488e31d2351935a7309646c1b7ecd31 /src/core/surface
parent24e274b8d9c9617bc10b9dcde66864fb2884b58c (diff)
parentc77dc39771d0e0fb8a581636bb9c3dc1adef3fb1 (diff)
Merge github.com:grpc/grpc into sceq
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/alarm.c2
-rw-r--r--src/core/surface/call.c154
-rw-r--r--src/core/surface/lame_client.c3
-rw-r--r--src/core/surface/server.c4
-rw-r--r--src/core/surface/validate_metadata.c2
5 files changed, 107 insertions, 58 deletions
diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c
index d753023ca9..fb496f6c47 100644
--- a/src/core/surface/alarm.c
+++ b/src/core/surface/alarm.c
@@ -63,9 +63,9 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->cq = cq;
alarm->tag = tag;
+ grpc_cq_begin_op(cq, tag);
grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm,
gpr_now(GPR_CLOCK_MONOTONIC));
- grpc_cq_begin_op(cq, tag);
grpc_exec_ctx_finish(&exec_ctx);
return alarm;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 9495e748b5..1b117aa6b8 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -159,6 +159,9 @@ struct grpc_call {
uint8_t receiving_message;
uint8_t received_final_op;
+ /* have we received initial metadata */
+ bool has_initial_md_been_received;
+
batch_control active_batches[MAX_CONCURRENT_BATCHES];
/* first idx: is_receiving, second idx: is_trailing */
@@ -200,6 +203,7 @@ struct grpc_call {
gpr_slice receiving_slice;
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
+ grpc_closure receiving_initial_metadata_ready;
uint32_t test_only_last_message_flags;
union {
@@ -212,6 +216,11 @@ struct grpc_call {
int *cancelled;
} server;
} final_op;
+
+ struct {
+ void *bctlp;
+ bool success;
+ } saved_receiving_stream_ready_ctx;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -993,6 +1002,94 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
+static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
+ bool success) {
+ grpc_call *call = bctl->call;
+ if (call->receiving_stream == NULL) {
+ *call->receiving_buffer = NULL;
+ call->receiving_message = 0;
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+ } else if (call->receiving_stream->length >
+ grpc_channel_get_max_message_length(call->channel)) {
+ cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
+ "Max message size exceeded");
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ call->receiving_stream = NULL;
+ *call->receiving_buffer = NULL;
+ call->receiving_message = 0;
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+ } else {
+ call->test_only_last_message_flags = call->receiving_stream->flags;
+ if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
+ NULL, 0, call->compression_algorithm);
+ } else {
+ *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
+ }
+ grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
+ bctl);
+ continue_receiving_slices(exec_ctx, bctl);
+ /* early out */
+ return;
+ }
+}
+
+static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
+ bool success) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+
+ gpr_mu_lock(&bctl->call->mu);
+ if (bctl->call->has_initial_md_been_received) {
+ gpr_mu_unlock(&bctl->call->mu);
+ process_data_after_md(exec_ctx, bctlp, success);
+ } else {
+ call->saved_receiving_stream_ready_ctx.bctlp = bctlp;
+ call->saved_receiving_stream_ready_ctx.success = success;
+ gpr_mu_unlock(&bctl->call->mu);
+ }
+}
+
+static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
+ void *bctlp, bool success) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+
+ gpr_mu_lock(&call->mu);
+
+ grpc_metadata_batch *md =
+ &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+ grpc_metadata_batch_filter(md, recv_initial_filter, call);
+ call->has_initial_md_been_received = true;
+
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
+ 0 &&
+ !call->is_client) {
+ GPR_TIMER_BEGIN("set_deadline_alarm", 0);
+ set_deadline_alarm(exec_ctx, call, md->deadline);
+ GPR_TIMER_END("set_deadline_alarm", 0);
+ }
+
+ if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
+ grpc_closure *saved_rsr_closure = grpc_closure_create(
+ receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
+ grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure,
+ call->saved_receiving_stream_ready_ctx.success, NULL);
+ call->saved_receiving_stream_ready_ctx.bctlp = NULL;
+ }
+
+ gpr_mu_unlock(&call->mu);
+
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+}
+
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
@@ -1011,19 +1108,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
- if (bctl->recv_initial_metadata) {
- grpc_metadata_batch *md =
- &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- grpc_metadata_batch_filter(md, recv_initial_filter, call);
-
- if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
- 0 &&
- !call->is_client) {
- GPR_TIMER_BEGIN("set_deadline_alarm", 0);
- set_deadline_alarm(exec_ctx, call, md->deadline);
- GPR_TIMER_END("set_deadline_alarm", 0);
- }
- }
if (bctl->recv_final_op) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
@@ -1065,45 +1149,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
}
}
-static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success) {
- batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
-
- if (call->receiving_stream == NULL) {
- *call->receiving_buffer = NULL;
- call->receiving_message = 0;
- if (gpr_unref(&bctl->steps_to_complete)) {
- post_batch_completion(exec_ctx, bctl);
- }
- } else if (call->receiving_stream->length >
- grpc_channel_get_max_message_length(call->channel)) {
- cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
- "Max message size exceeded");
- grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
- call->receiving_stream = NULL;
- *call->receiving_buffer = NULL;
- call->receiving_message = 0;
- if (gpr_unref(&bctl->steps_to_complete)) {
- post_batch_completion(exec_ctx, bctl);
- }
- } else {
- call->test_only_last_message_flags = call->receiving_stream->flags;
- if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
- (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
- *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
- NULL, 0, call->compression_algorithm);
- } else {
- *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
- }
- grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
- bctl);
- continue_receiving_slices(exec_ctx, bctl);
- /* early out */
- return;
- }
-}
-
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
@@ -1273,9 +1318,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
+ grpc_closure_init(&call->receiving_initial_metadata_ready,
+ receiving_initial_metadata_ready, bctl);
bctl->recv_initial_metadata = 1;
stream_op.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+ stream_op.recv_initial_metadata_ready =
+ &call->receiving_initial_metadata_ready;
+ num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
/* Flag validation: currently allow no flags */
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 705996cad3..537069e984 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -78,8 +78,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
} else if (op->recv_trailing_metadata != NULL) {
fill_metadata(elem, op->recv_trailing_metadata);
}
- grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
- grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 42cffccb4c..fb5e0d4b9e 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -596,8 +596,8 @@ static void server_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv_initial_metadata = op->on_complete;
- op->on_complete = &calld->server_on_recv_initial_metadata;
+ calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
}
}
diff --git a/src/core/surface/validate_metadata.c b/src/core/surface/validate_metadata.c
index df2e80b4b7..bf4126867f 100644
--- a/src/core/surface/validate_metadata.c
+++ b/src/core/surface/validate_metadata.c
@@ -50,7 +50,7 @@ static int conforms_to(const char *s, size_t len, const uint8_t *legal_bits) {
int grpc_header_key_is_legal(const char *key, size_t length) {
static const uint8_t legal_header_bits[256 / 8] = {
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x20, 0xff, 0x03, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0x03, 0x00, 0x00, 0x00,
0x80, 0xfe, 0xff, 0xff, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
if (length == 0) {