aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-01-14 23:39:00 -0800
committerGravatar Yang Gao <yangg@google.com>2015-01-14 23:39:00 -0800
commitea36ba3285f21a798ddefb68a9a465a108e1ffbb (patch)
tree5a8644c1d5a5794aeb081d62789cc199058d5046 /src
parente55e1155d3f8776b5645d0e741353da0820bdd92 (diff)
parent0fc50596c7803c2b33ed08d7c72c109f1ae76497 (diff)
Merge pull request #34 from ctiller/api
New C layer invoke API
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/byte_buffer.c11
-rw-r--r--src/core/surface/call.c184
-rw-r--r--src/cpp/client/channel.cc15
-rw-r--r--src/cpp/stream/stream_context.cc12
-rw-r--r--src/cpp/stream/stream_context.h1
5 files changed, 140 insertions, 83 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 27a6c6e33d..d1be41074d 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -49,6 +49,17 @@ grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
return bb;
}
+grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
+ switch (bb->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ return grpc_byte_buffer_create(bb->data.slice_buffer.slices,
+ bb->data.slice_buffer.count);
+ }
+ gpr_log(GPR_INFO, "should never get here");
+ abort();
+ return NULL;
+}
+
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 0d72bf42fb..d8a34cf68d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -173,11 +173,14 @@ struct grpc_call {
/* protects variables in this section */
gpr_mu read_mu;
+ gpr_uint8 received_start;
+ gpr_uint8 start_ok;
gpr_uint8 reads_done;
gpr_uint8 received_finish;
gpr_uint8 received_metadata;
gpr_uint8 have_read;
gpr_uint8 have_alarm;
+ gpr_uint8 pending_writes_done;
gpr_uint8 got_status_code;
/* The current outstanding read message tag (only valid if have_read == 1) */
void *read_tag;
@@ -190,6 +193,8 @@ struct grpc_call {
/* The current outstanding send message/context/invoke/end tag (only valid if
have_write == 1) */
void *write_tag;
+ grpc_byte_buffer *pending_write;
+ gpr_uint32 pending_write_flags;
/* The final status of the call */
grpc_status_code status_code;
@@ -227,11 +232,15 @@ grpc_call *grpc_call_create(grpc_channel *channel,
call->have_alarm = 0;
call->received_metadata = 0;
call->got_status_code = 0;
+ call->start_ok = 0;
call->status_code =
server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
call->status_details = NULL;
call->received_finish = 0;
call->reads_done = 0;
+ call->received_start = 0;
+ call->pending_write = NULL;
+ call->pending_writes_done = 0;
grpc_metadata_buffer_init(&call->incoming_metadata);
gpr_ref_init(&call->internal_refcount, 1);
grpc_call_stack_init(channel_stack, server_transport_data,
@@ -360,16 +369,6 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
return GRPC_CALL_OK;
}
-static void done_invoke(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
static void finish_call(grpc_call *call) {
size_t count;
grpc_metadata *elements;
@@ -384,11 +383,81 @@ static void finish_call(grpc_call *call) {
elements, count);
}
-grpc_call_error grpc_call_start_invoke(grpc_call *call,
- grpc_completion_queue *cq,
- void *invoke_accepted_tag,
- void *metadata_read_tag,
- void *finished_tag, gpr_uint32 flags) {
+static void done_write(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void done_writes_done(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void call_started(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ grpc_call_element *elem;
+ grpc_byte_buffer *pending_write = NULL;
+ gpr_uint32 pending_write_flags = 0;
+ gpr_uint8 pending_writes_done = 0;
+ int ok;
+ grpc_call_op op;
+
+ gpr_mu_lock(&call->read_mu);
+ GPR_ASSERT(!call->received_start);
+ call->received_start = 1;
+ ok = call->start_ok = (error == GRPC_OP_OK);
+ pending_write = call->pending_write;
+ pending_write_flags = call->pending_write_flags;
+ pending_writes_done = call->pending_writes_done;
+ gpr_mu_unlock(&call->read_mu);
+
+ if (pending_write) {
+ if (ok) {
+ op.type = GRPC_SEND_MESSAGE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = pending_write_flags;
+ op.done_cb = done_write;
+ op.user_data = call;
+ op.data.message = pending_write;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ } else {
+ done_write(call, error);
+ }
+ grpc_byte_buffer_destroy(pending_write);
+ }
+ if (pending_writes_done) {
+ if (ok) {
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ } else {
+ done_writes_done(call, error);
+ }
+ }
+
+ grpc_call_internal_unref(call);
+}
+
+grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
+ void *metadata_read_tag, void *finished_tag,
+ gpr_uint32 flags) {
grpc_call_element *elem;
grpc_call_op op;
@@ -420,7 +489,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
/* inform the completion queue of an incoming operation */
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
- grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED);
gpr_mu_lock(&call->read_mu);
@@ -431,8 +499,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
if (call->received_finish) {
/* handle early cancellation */
- grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL,
- GRPC_OP_ERROR);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL,
NULL, 0, NULL);
finish_call(call);
@@ -442,20 +508,18 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
return GRPC_CALL_OK;
}
- call->write_tag = invoke_accepted_tag;
call->metadata_tag = metadata_read_tag;
- call->have_write = 1;
-
gpr_mu_unlock(&call->read_mu);
/* call down the filter stack */
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
- op.done_cb = done_invoke;
+ op.done_cb = call_started;
op.data.start.pollset = grpc_cq_pollset(cq);
op.user_data = call;
+ grpc_call_internal_ref(call);
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
@@ -486,6 +550,7 @@ grpc_call_error grpc_call_server_accept(grpc_call *call,
call->state = CALL_BOUNDCQ;
call->cq = cq;
call->finished_tag = finished_tag;
+ call->received_start = 1;
if (prq_is_empty(&call->prq) && call->received_finish) {
finish_call(call);
@@ -546,26 +611,6 @@ grpc_call_error grpc_call_accept(grpc_call *call, grpc_completion_queue *cq,
return GRPC_CALL_OK;
}
-static void done_writes_done(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
-static void done_write(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
void grpc_call_client_initial_metadata_complete(
grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element);
@@ -628,7 +673,7 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
} else {
call->read_tag = tag;
call->have_read = 1;
- request_more = 1;
+ request_more = call->received_start;
}
} else if (prq_is_empty(&call->prq) && call->received_finish) {
finish_call(call);
@@ -665,8 +710,6 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
- /* for now we do no buffering, so a NULL byte_buffer can have no impact
- on our behavior -- succeed immediately */
/* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a
flush, and that flush should be propogated down from here */
if (byte_buffer == NULL) {
@@ -677,15 +720,25 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
call->write_tag = tag;
call->have_write = 1;
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.done_cb = done_write;
- op.user_data = call;
- op.data.message = byte_buffer;
+ gpr_mu_lock(&call->read_mu);
+ if (!call->received_start) {
+ call->pending_write = grpc_byte_buffer_copy(byte_buffer);
+ call->pending_write_flags = flags;
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ gpr_mu_unlock(&call->read_mu);
+ } else {
+ gpr_mu_unlock(&call->read_mu);
+
+ op.type = GRPC_SEND_MESSAGE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.done_cb = done_write;
+ op.user_data = call;
+ op.data.message = byte_buffer;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ }
return GRPC_CALL_OK;
}
@@ -717,14 +770,23 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
call->write_tag = tag;
call->have_write = 1;
- op.type = GRPC_SEND_FINISH;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = done_writes_done;
- op.user_data = call;
+ gpr_mu_lock(&call->read_mu);
+ if (!call->received_start) {
+ call->pending_writes_done = 1;
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ gpr_mu_unlock(&call->read_mu);
+ } else {
+ gpr_mu_unlock(&call->read_mu);
+
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ }
return GRPC_CALL_OK;
}
@@ -829,6 +891,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdelem *md = op->data.metadata;
grpc_mdstr *key = md->key;
+ gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call,
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
if (key == grpc_channel_get_status_string(call->channel)) {
maybe_set_status_code(call, decode_status(md));
grpc_mdelem_unref(md);
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index ddda8c22d6..f476f77a49 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -104,7 +104,6 @@ Status Channel::StartBlockingRpc(const RpcMethod& method,
context->set_call(call);
grpc_event* ev;
void* finished_tag = reinterpret_cast<char*>(call);
- void* invoke_tag = reinterpret_cast<char*>(call) + 1;
void* metadata_read_tag = reinterpret_cast<char*>(call) + 2;
void* write_tag = reinterpret_cast<char*>(call) + 3;
void* halfclose_tag = reinterpret_cast<char*>(call) + 4;
@@ -115,19 +114,11 @@ Status Channel::StartBlockingRpc(const RpcMethod& method,
// add_metadata from context
//
// invoke
- GPR_ASSERT(grpc_call_start_invoke(call, cq, invoke_tag, metadata_read_tag,
- finished_tag,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- ev = grpc_completion_queue_pluck(cq, invoke_tag, gpr_inf_future);
- bool success = ev->data.invoke_accepted == GRPC_OP_OK;
- grpc_event_finish(ev);
- if (!success) {
- GetFinalStatus(cq, finished_tag, &status);
- return status;
- }
+ GPR_ASSERT(grpc_call_invoke(call, cq, metadata_read_tag, finished_tag,
+ GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
// write request
grpc_byte_buffer* write_buffer = nullptr;
- success = SerializeProto(request, &write_buffer);
+ bool success = SerializeProto(request, &write_buffer);
if (!success) {
grpc_call_cancel(call);
status =
diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc
index 5ccf8c9682..ebe71594c0 100644
--- a/src/cpp/stream/stream_context.cc
+++ b/src/cpp/stream/stream_context.cc
@@ -80,17 +80,9 @@ void StreamContext::Start(bool buffered) {
if (is_client_) {
// TODO(yangg) handle metadata send path
int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0;
- grpc_call_error error = grpc_call_start_invoke(call(), cq(), invoke_tag(),
- client_metadata_read_tag(),
- finished_tag(), flag);
+ grpc_call_error error = grpc_call_invoke(
+ call(), cq(), client_metadata_read_tag(), finished_tag(), flag);
GPR_ASSERT(GRPC_CALL_OK == error);
- grpc_event* invoke_ev =
- grpc_completion_queue_pluck(cq(), invoke_tag(), gpr_inf_future);
- if (invoke_ev->data.invoke_accepted != GRPC_OP_OK) {
- peer_halfclosed_ = true;
- self_halfclosed_ = true;
- }
- grpc_event_finish(invoke_ev);
} else {
// TODO(yangg) metadata needs to be added before accept
// TODO(yangg) correctly set flag to accept
diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h
index 4781f27a77..d6bd7a2370 100644
--- a/src/cpp/stream/stream_context.h
+++ b/src/cpp/stream/stream_context.h
@@ -76,7 +76,6 @@ class StreamContext final : public StreamContextInterface {
void* read_tag() { return reinterpret_cast<char*>(this) + 1; }
void* write_tag() { return reinterpret_cast<char*>(this) + 2; }
void* halfclose_tag() { return reinterpret_cast<char*>(this) + 3; }
- void* invoke_tag() { return reinterpret_cast<char*>(this) + 4; }
void* client_metadata_read_tag() { return reinterpret_cast<char*>(this) + 5; }
grpc_call* call() { return call_; }
grpc_completion_queue* cq() { return cq_; }