diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/support/subprocess_posix.c | 6 | ||||
-rw-r--r-- | src/core/surface/call.c | 56 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 2 | ||||
-rw-r--r-- | src/node/src/client.js | 56 |
4 files changed, 76 insertions, 44 deletions
diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c index 642520bb47..b4631fa0ed 100644 --- a/src/core/support/subprocess_posix.c +++ b/src/core/support/subprocess_posix.c @@ -57,7 +57,7 @@ struct gpr_subprocess { const char *gpr_subprocess_binary_extension() { return ""; } -gpr_subprocess *gpr_subprocess_create(int argc, char **argv) { +gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) { gpr_subprocess *r; int pid; char **exec_args; @@ -92,7 +92,11 @@ void gpr_subprocess_destroy(gpr_subprocess *p) { int gpr_subprocess_join(gpr_subprocess *p) { int status; +retry: if (waitpid(p->pid, &status, 0) == -1) { + if (errno == EINTR) { + goto retry; + } gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno)); return -1; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ec6fd65ea3..4d2ba7cd7d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, switch ((grpc_ioreq_op)i) { case GRPC_IOREQ_RECV_MESSAGE: case GRPC_IOREQ_SEND_MESSAGE: - if (master->success) { - call->request_set[i] = REQSET_EMPTY; - } else { + call->request_set[i] = REQSET_EMPTY; + if (!master->success) { call->write_state = WRITE_STATE_WRITE_CLOSED; } break; @@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { } } +static void early_out_write_ops(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_WRITE_CLOSED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + /* fallthrough */ + case WRITE_STATE_STARTED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); + /* fallthrough */ + case WRITE_STATE_INITIAL: + /* do nothing */ + break; + } +} + static void call_on_done_send(void *pc, int success) { grpc_call *call = pc; lock(call); if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); + call->write_state = WRITE_STATE_STARTED; } if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success); @@ -596,6 +613,11 @@ static void call_on_done_send(void *pc, int success) { finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + call->write_state = WRITE_STATE_WRITE_CLOSED; + } + if (!success) { + call->write_state = WRITE_STATE_WRITE_CLOSED; + early_out_write_ops(call); } call->send_ops.nops = 0; call->last_send_contains = 0; @@ -811,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->send_ops = &call->send_ops; op->bind_pollset = grpc_cq_pollset(call->cq); call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; - call->write_state = WRITE_STATE_STARTED; call->send_initial_metadata_count = 0; /* fall through intended */ case WRITE_STATE_STARTED: @@ -827,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->is_last_send = 1; op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; - call->write_state = WRITE_STATE_WRITE_CLOSED; if (!call->is_client) { /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; @@ -919,23 +939,6 @@ static void finish_read_ops(grpc_call *call) { } } -static void early_out_write_ops(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - /* fallthrough */ - case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); - /* fallthrough */ - case WRITE_STATE_INITIAL: - /* do nothing */ - break; - } -} - static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func completion, @@ -1176,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { + grpc_cq_end_op(call->cq, tag, call, success); +} + +static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } @@ -1186,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; + void (*finish_func)(grpc_call *, int, void *) = finish_batch; GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); @@ -1269,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_status_on_client.trailing_metadata; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: req = &reqs[out++]; @@ -1278,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_close_on_server.cancelled; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; } } grpc_cq_begin_op(call->cq, call); - return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 62f4020d7e..e66b4ed2d8 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -149,7 +149,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } buf.AddServerSendStatus(&ctx_.trailing_metadata_, status); call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + cq_.Pluck(&buf); /* status ignored */ void* ignored_tag; bool ignored_ok; cq_.Shutdown(); diff --git a/src/node/src/client.js b/src/node/src/client.js index 46d476b9f4..efec05bbf3 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -81,7 +81,8 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); this.call.startBatch(batch, function(err, event) { if (err) { - throw err; + // Something has gone wrong. Stop writing by failing to call callback + return; } callback(); }); @@ -120,7 +121,9 @@ function _read(size) { */ function readCallback(err, event) { if (err) { - throw err; + // Something has gone wrong. Stop reading and wait for status + self.finished = true; + return; } if (self.finished) { self.push(null); @@ -237,10 +240,6 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { - if (err) { - callback(err); - return; - } emitter.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -248,6 +247,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; callback(error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } } emitter.emit('metadata', response.metadata); callback(null, deserialize(response.read)); @@ -300,7 +305,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(metadata_batch, function(err, response) { if (err) { - callback(err); + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. return; } stream.emit('metadata', response.metadata); @@ -309,10 +315,6 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { - if (err) { - callback(err); - return; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -320,6 +322,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; callback(error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } } callback(null, deserialize(response.read)); }); @@ -373,16 +381,15 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(start_batch, function(err, response) { if (err) { - throw err; + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; } stream.emit('metadata', response.metadata); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { - if (err) { - throw err; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -390,6 +397,12 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; stream.emit('error', error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + stream.emit('error', err); + return; + } } }); }); @@ -438,16 +451,15 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(start_batch, function(err, response) { if (err) { - throw err; + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; } stream.emit('metadata', response.metadata); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { - if (err) { - throw err; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -455,6 +467,12 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; stream.emit('error', error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + stream.emit('error', err); + return; + } } }); }); |