aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-05 21:41:23 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-05 21:41:23 -0800
commit8b976d0c244e516c2195c699bf396cf1dac2baa5 (patch)
treef6e3a69f2fb200cb3d036844cf691eebbd7eb36f /src/core
parentc230a7451dbfc0627bdd3173118a91b075856619 (diff)
Fixes
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c41
-rw-r--r--src/core/surface/call.c11
-rw-r--r--src/core/surface/server.c9
3 files changed, 39 insertions, 22 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 507b91b8a6..c2c23f5156 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -210,11 +210,30 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
chand->waiting_child_count = new_count;
}
+static void send_up_cancelled_ops(grpc_call_element *elem) {
+ grpc_call_op finish_op;
+ channel_data *chand = elem->channel_data;
+ /* send up a synthesized status */
+ finish_op.type = GRPC_RECV_METADATA;
+ finish_op.dir = GRPC_CALL_UP;
+ finish_op.flags = 0;
+ finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
+ finish_op.done_cb = do_nothing;
+ finish_op.user_data = NULL;
+ grpc_call_next_op(elem, &finish_op);
+ /* send up a finish */
+ finish_op.type = GRPC_RECV_FINISH;
+ finish_op.dir = GRPC_CALL_UP;
+ finish_op.flags = 0;
+ finish_op.done_cb = do_nothing;
+ finish_op.user_data = NULL;
+ grpc_call_next_op(elem, &finish_op);
+}
+
static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
- grpc_call_op finish_op;
gpr_mu_lock(&chand->mu);
switch (calld->state) {
@@ -225,27 +244,15 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
return; /* early out */
case CALL_WAITING:
remove_waiting_child(chand, calld);
+ gpr_mu_unlock(&chand->mu);
+ send_up_cancelled_ops(elem);
calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
GRPC_OP_ERROR);
- /* fallthrough intended */
+ return; /* early out */
case CALL_CREATED:
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
- /* send up a synthesized status */
- finish_op.type = GRPC_RECV_METADATA;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
- /* send up a finish */
- finish_op.type = GRPC_RECV_FINISH;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+ send_up_cancelled_ops(elem);
return; /* early out */
case CALL_CANCELLED:
gpr_mu_unlock(&chand->mu);
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index bcb7c87733..b3f272e068 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -295,10 +295,19 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
+ int flush;
+
call->status[source].is_set = 1;
call->status[source].code = status;
- if (status != GRPC_OP_OK) {
+ if (call->is_client) {
+ flush = status == GRPC_STATUS_CANCELLED;
+ } else {
+ flush = status != GRPC_STATUS_OK;
+ }
+
+ if (flush && !grpc_bbq_empty(&call->incoming_queue)) {
+ gpr_log(GPR_ERROR, "Flushing unread messages due to error status %d", status);
grpc_bbq_flush(&call->incoming_queue);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index c0c524ad8d..ee0f96a580 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -346,6 +346,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
channel_data *chand = elem->channel_data;
+ grpc_server *server = chand->server;
switch (op->type) {
case GRPC_ACCEPT_CALL:
@@ -356,11 +357,11 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the
channel */
- gpr_mu_lock(&chand->server->mu);
- server_ref(chand->server);
+ gpr_mu_lock(&server->mu);
+ server_ref(server);
destroy_channel(chand);
- gpr_mu_unlock(&chand->server->mu);
- server_unref(chand->server);
+ gpr_mu_unlock(&server->mu);
+ server_unref(server);
break;
case GRPC_TRANSPORT_GOAWAY:
gpr_slice_unref(op->data.goaway.message);