aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/surface/byte_buffer.c10
-rw-r--r--src/core/surface/call.c149
2 files changed, 120 insertions, 39 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 27a6c6e33d..4087622894 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -49,6 +49,16 @@ grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
return bb;
}
+grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
+ switch (bb->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ return grpc_byte_buffer_create(bb->data.slice_buffer.slices, bb->data.slice_buffer.count);
+ }
+ gpr_log(GPR_INFO, "should never get here");
+ abort();
+ return NULL;
+}
+
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 9c5f5064eb..f6d93bd957 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -173,11 +173,14 @@ struct grpc_call {
/* protects variables in this section */
gpr_mu read_mu;
+ gpr_uint8 received_start;
+ gpr_uint8 start_ok;
gpr_uint8 reads_done;
gpr_uint8 received_finish;
gpr_uint8 received_metadata;
gpr_uint8 have_read;
gpr_uint8 have_alarm;
+ gpr_uint8 pending_writes_done;
/* The current outstanding read message tag (only valid if have_read == 1) */
void *read_tag;
void *metadata_tag;
@@ -189,6 +192,8 @@ struct grpc_call {
/* The current outstanding send message/context/invoke/end tag (only valid if
have_write == 1) */
void *write_tag;
+ grpc_byte_buffer *pending_write;
+ gpr_uint32 pending_write_flags;
/* The final status of the call */
grpc_status_code status_code;
@@ -230,6 +235,9 @@ grpc_call *grpc_call_create(grpc_channel *channel,
call->status_details = NULL;
call->received_finish = 0;
call->reads_done = 0;
+ call->received_start = 0;
+ call->pending_write = NULL;
+ call->pending_writes_done = 0;
grpc_metadata_buffer_init(&call->incoming_metadata);
gpr_ref_init(&call->internal_refcount, 1);
grpc_call_stack_init(channel_stack, server_transport_data,
@@ -330,16 +338,6 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
return GRPC_CALL_OK;
}
-static void done_invoke(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
static void finish_call(grpc_call *call) {
size_t count;
grpc_metadata *elements;
@@ -359,6 +357,88 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
void *invoke_accepted_tag,
void *metadata_read_tag,
void *finished_tag, gpr_uint32 flags) {
+ grpc_call_error err = grpc_call_invoke(call, cq, metadata_read_tag, finished_tag, flags);
+ if (err == GRPC_CALL_OK) {
+ grpc_cq_begin_op(call->cq, call, GRPC_INVOKE_ACCEPTED);
+ grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, do_nothing, NULL, GRPC_OP_OK);
+ }
+ return err;
+}
+
+static void done_write(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void done_writes_done(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void call_started(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ grpc_call_element *elem;
+ grpc_byte_buffer *pending_write = NULL;
+ gpr_uint32 pending_write_flags = 0;
+ gpr_uint8 pending_writes_done = 0;
+ int ok;
+ grpc_call_op op;
+
+ gpr_mu_lock(&call->read_mu);
+ GPR_ASSERT(!call->received_start);
+ call->received_start = 1;
+ ok = call->start_ok = (error == GRPC_OP_OK);
+ pending_write = call->pending_write;
+ pending_write_flags = call->pending_write_flags;
+ pending_writes_done = call->pending_writes_done;
+ gpr_mu_unlock(&call->read_mu);
+
+ if (pending_write) {
+ if (ok) {
+ op.type = GRPC_SEND_MESSAGE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = pending_write_flags;
+ op.done_cb = done_write;
+ op.user_data = call;
+ op.data.message = pending_write;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ } else {
+ done_write(call, error);
+ }
+ grpc_byte_buffer_destroy(pending_write);
+ }
+ if (pending_writes_done) {
+ if (ok) {
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ } else {
+ done_writes_done(call, error);
+ }
+ }
+}
+
+grpc_call_error grpc_call_invoke(grpc_call *call,
+ grpc_completion_queue *cq,
+ void *metadata_read_tag,
+ void *finished_tag, gpr_uint32 flags) {
grpc_call_element *elem;
grpc_call_op op;
@@ -390,7 +470,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
/* inform the completion queue of an incoming operation */
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
- grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED);
gpr_mu_lock(&call->read_mu);
@@ -401,8 +480,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
if (call->received_finish) {
/* handle early cancellation */
- grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL,
- GRPC_OP_ERROR);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL,
NULL, 0, NULL);
finish_call(call);
@@ -412,18 +489,15 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
return GRPC_CALL_OK;
}
- call->write_tag = invoke_accepted_tag;
call->metadata_tag = metadata_read_tag;
- call->have_write = 1;
-
gpr_mu_unlock(&call->read_mu);
/* call down the filter stack */
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
- op.done_cb = done_invoke;
+ op.done_cb = call_started;
op.data.start.pollset = grpc_cq_pollset(cq);
op.user_data = call;
@@ -516,26 +590,6 @@ grpc_call_error grpc_call_accept(grpc_call *call, grpc_completion_queue *cq,
return GRPC_CALL_OK;
}
-static void done_writes_done(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
-static void done_write(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
void grpc_call_client_initial_metadata_complete(
grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element);
@@ -635,8 +689,6 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
- /* for now we do no buffering, so a NULL byte_buffer can have no impact
- on our behavior -- succeed immediately */
/* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a
flush, and that flush should be propogated down from here */
if (byte_buffer == NULL) {
@@ -647,6 +699,15 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
call->write_tag = tag;
call->have_write = 1;
+ gpr_mu_lock(&call->read_mu);
+ if (!call->received_start) {
+ call->pending_write = grpc_byte_buffer_copy(byte_buffer);
+ call->pending_write_flags = flags;
+
+ gpr_mu_unlock(&call->read_mu);
+ } else {
+ gpr_mu_unlock(&call->read_mu);
+
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
@@ -656,6 +717,7 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
+ }
return GRPC_CALL_OK;
}
@@ -687,6 +749,14 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
call->write_tag = tag;
call->have_write = 1;
+ gpr_mu_lock(&call->read_mu);
+ if (!call->received_start) {
+ call->pending_writes_done = 1;
+
+ gpr_mu_unlock(&call->read_mu);
+ } else {
+ gpr_mu_unlock(&call->read_mu);
+
op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
@@ -695,6 +765,7 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
+ }
return GRPC_CALL_OK;
}