aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-24 15:59:10 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-24 15:59:10 -0700
commitfe46136721acc3298a2949a7321d043180daed92 (patch)
tree2f6d7ba85a2ea51bbef0c298276ec6c83a6c663b /src/core/surface
parentc0cb6357e9c95893aae964599af7bb823a44976d (diff)
parent031dea1df4b6213b9f9779a824fccc6d348b8648 (diff)
Merge github.com:grpc/grpc into scalable-poll
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c61
-rw-r--r--src/core/surface/completion_queue.c4
-rw-r--r--src/core/surface/server.c4
3 files changed, 41 insertions, 28 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 50df36cae9..4d2ba7cd7d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -401,6 +401,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
(is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
@@ -408,8 +409,7 @@ static int need_more_data(grpc_call *call) {
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
- (call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
- call->read_state < READ_STATE_GOT_INITIAL_METADATA);
+ (call->write_state == WRITE_STATE_INITIAL && !call->is_client);
}
static void unlock(grpc_call *call) {
@@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
switch ((grpc_ioreq_op)i) {
case GRPC_IOREQ_RECV_MESSAGE:
case GRPC_IOREQ_SEND_MESSAGE:
- if (master->success) {
- call->request_set[i] = REQSET_EMPTY;
- } else {
+ call->request_set[i] = REQSET_EMPTY;
+ if (!master->success) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
@@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
}
}
+static void early_out_write_ops(grpc_call *call) {
+ switch (call->write_state) {
+ case WRITE_STATE_WRITE_CLOSED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ /* fallthrough */
+ case WRITE_STATE_STARTED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
+ /* fallthrough */
+ case WRITE_STATE_INITIAL:
+ /* do nothing */
+ break;
+ }
+}
+
static void call_on_done_send(void *pc, int success) {
grpc_call *call = pc;
lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
+ call->write_state = WRITE_STATE_STARTED;
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
@@ -596,7 +613,13 @@ static void call_on_done_send(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ }
+ if (!success) {
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ early_out_write_ops(call);
}
+ call->send_ops.nops = 0;
call->last_send_contains = 0;
call->sending = 0;
unlock(call);
@@ -810,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->send_ops = &call->send_ops;
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
- call->write_state = WRITE_STATE_STARTED;
call->send_initial_metadata_count = 0;
/* fall through intended */
case WRITE_STATE_STARTED:
@@ -826,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->is_last_send = 1;
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
- call->write_state = WRITE_STATE_WRITE_CLOSED;
if (!call->is_client) {
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
@@ -918,23 +939,6 @@ static void finish_read_ops(grpc_call *call) {
}
}
-static void early_out_write_ops(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_WRITE_CLOSED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
- /* fallthrough */
- case WRITE_STATE_STARTED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
- /* fallthrough */
- case WRITE_STATE_INITIAL:
- /* do nothing */
- break;
- }
-}
-
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs,
grpc_ioreq_completion_func completion,
@@ -1175,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
+ grpc_cq_end_op(call->cq, tag, call, success);
+}
+
+static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
@@ -1185,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
+ void (*finish_func)(grpc_call *, int, void *) = finish_batch;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
@@ -1268,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_status_on_client.trailing_metadata;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
req = &reqs[out++];
@@ -1277,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
}
}
grpc_cq_begin_op(call->cq, call);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func,
tag);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 48910afda3..8c9ca48a05 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -275,14 +275,14 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
return ret;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret = ev->base;
gpr_free(ev);
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
return ret;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 351ed5b758..24a23ae5c4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_add_callback(kill_zombie, elem);
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -663,7 +665,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
registered_method *m;
if (!method) {
- gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
+ gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL");
return NULL;
}
for (m = server->registered_methods; m; m = m->next) {