diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-05-24 15:59:10 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-05-24 15:59:10 -0700 |
commit | fe46136721acc3298a2949a7321d043180daed92 (patch) | |
tree | 2f6d7ba85a2ea51bbef0c298276ec6c83a6c663b /src/core/surface | |
parent | c0cb6357e9c95893aae964599af7bb823a44976d (diff) | |
parent | 031dea1df4b6213b9f9779a824fccc6d348b8648 (diff) |
Merge github.com:grpc/grpc into scalable-poll
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 61 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 4 | ||||
-rw-r--r-- | src/core/surface/server.c | 4 |
3 files changed, 41 insertions, 28 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 50df36cae9..4d2ba7cd7d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -401,6 +401,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static int need_more_data(grpc_call *call) { + if (call->read_state == READ_STATE_STREAM_CLOSED) return 0; return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) || is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || @@ -408,8 +409,7 @@ static int need_more_data(grpc_call *call) { is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || - (call->write_state == WRITE_STATE_INITIAL && !call->is_client && - call->read_state < READ_STATE_GOT_INITIAL_METADATA); + (call->write_state == WRITE_STATE_INITIAL && !call->is_client); } static void unlock(grpc_call *call) { @@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, switch ((grpc_ioreq_op)i) { case GRPC_IOREQ_RECV_MESSAGE: case GRPC_IOREQ_SEND_MESSAGE: - if (master->success) { - call->request_set[i] = REQSET_EMPTY; - } else { + call->request_set[i] = REQSET_EMPTY; + if (!master->success) { call->write_state = WRITE_STATE_WRITE_CLOSED; } break; @@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { } } +static void early_out_write_ops(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_WRITE_CLOSED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + /* fallthrough */ + case WRITE_STATE_STARTED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); + /* fallthrough */ + case WRITE_STATE_INITIAL: + /* do nothing */ + break; + } +} + static void call_on_done_send(void *pc, int success) { grpc_call *call = pc; lock(call); if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); + call->write_state = WRITE_STATE_STARTED; } if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success); @@ -596,7 +613,13 @@ static void call_on_done_send(void *pc, int success) { finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + call->write_state = WRITE_STATE_WRITE_CLOSED; + } + if (!success) { + call->write_state = WRITE_STATE_WRITE_CLOSED; + early_out_write_ops(call); } + call->send_ops.nops = 0; call->last_send_contains = 0; call->sending = 0; unlock(call); @@ -810,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->send_ops = &call->send_ops; op->bind_pollset = grpc_cq_pollset(call->cq); call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; - call->write_state = WRITE_STATE_STARTED; call->send_initial_metadata_count = 0; /* fall through intended */ case WRITE_STATE_STARTED: @@ -826,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->is_last_send = 1; op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; - call->write_state = WRITE_STATE_WRITE_CLOSED; if (!call->is_client) { /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; @@ -918,23 +939,6 @@ static void finish_read_ops(grpc_call *call) { } } -static void early_out_write_ops(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - /* fallthrough */ - case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); - /* fallthrough */ - case WRITE_STATE_INITIAL: - /* do nothing */ - break; - } -} - static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func completion, @@ -1175,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { + grpc_cq_end_op(call->cq, tag, call, success); +} + +static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } @@ -1185,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; + void (*finish_func)(grpc_call *, int, void *) = finish_batch; GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); @@ -1268,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_status_on_client.trailing_metadata; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: req = &reqs[out++]; @@ -1277,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_close_on_server.cancelled; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; } } grpc_cq_begin_op(call->cq, call); - return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 48910afda3..8c9ca48a05 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -275,14 +275,14 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); return ret; } } gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); ret = ev->base; gpr_free(ev); - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); return ret; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 351ed5b758..24a23ae5c4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_add_callback(kill_zombie, elem); } else if (calld->state == PENDING) { call_list_remove(calld, PENDING_START); + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); } gpr_mu_unlock(&chand->server->mu); break; @@ -663,7 +665,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method, const char *host) { registered_method *m; if (!method) { - gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); + gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); return NULL; } for (m = server->registered_methods; m; m = m->next) { |