aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/support/subprocess_posix.c6
-rw-r--r--src/core/surface/call.c56
-rw-r--r--src/cpp/server/server.cc2
-rw-r--r--src/node/src/client.js56
4 files changed, 76 insertions, 44 deletions
diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c
index 642520bb47..b4631fa0ed 100644
--- a/src/core/support/subprocess_posix.c
+++ b/src/core/support/subprocess_posix.c
@@ -57,7 +57,7 @@ struct gpr_subprocess {
const char *gpr_subprocess_binary_extension() { return ""; }
-gpr_subprocess *gpr_subprocess_create(int argc, char **argv) {
+gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) {
gpr_subprocess *r;
int pid;
char **exec_args;
@@ -92,7 +92,11 @@ void gpr_subprocess_destroy(gpr_subprocess *p) {
int gpr_subprocess_join(gpr_subprocess *p) {
int status;
+retry:
if (waitpid(p->pid, &status, 0) == -1) {
+ if (errno == EINTR) {
+ goto retry;
+ }
gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno));
return -1;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index ec6fd65ea3..4d2ba7cd7d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
switch ((grpc_ioreq_op)i) {
case GRPC_IOREQ_RECV_MESSAGE:
case GRPC_IOREQ_SEND_MESSAGE:
- if (master->success) {
- call->request_set[i] = REQSET_EMPTY;
- } else {
+ call->request_set[i] = REQSET_EMPTY;
+ if (!master->success) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
@@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
}
}
+static void early_out_write_ops(grpc_call *call) {
+ switch (call->write_state) {
+ case WRITE_STATE_WRITE_CLOSED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ /* fallthrough */
+ case WRITE_STATE_STARTED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
+ /* fallthrough */
+ case WRITE_STATE_INITIAL:
+ /* do nothing */
+ break;
+ }
+}
+
static void call_on_done_send(void *pc, int success) {
grpc_call *call = pc;
lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
+ call->write_state = WRITE_STATE_STARTED;
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
@@ -596,6 +613,11 @@ static void call_on_done_send(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ }
+ if (!success) {
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ early_out_write_ops(call);
}
call->send_ops.nops = 0;
call->last_send_contains = 0;
@@ -811,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->send_ops = &call->send_ops;
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
- call->write_state = WRITE_STATE_STARTED;
call->send_initial_metadata_count = 0;
/* fall through intended */
case WRITE_STATE_STARTED:
@@ -827,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->is_last_send = 1;
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
- call->write_state = WRITE_STATE_WRITE_CLOSED;
if (!call->is_client) {
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
@@ -919,23 +939,6 @@ static void finish_read_ops(grpc_call *call) {
}
}
-static void early_out_write_ops(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_WRITE_CLOSED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
- /* fallthrough */
- case WRITE_STATE_STARTED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
- /* fallthrough */
- case WRITE_STATE_INITIAL:
- /* do nothing */
- break;
- }
-}
-
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs,
grpc_ioreq_completion_func completion,
@@ -1176,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
+ grpc_cq_end_op(call->cq, tag, call, success);
+}
+
+static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
@@ -1186,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
+ void (*finish_func)(grpc_call *, int, void *) = finish_batch;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
@@ -1269,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_status_on_client.trailing_metadata;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
req = &reqs[out++];
@@ -1278,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
}
}
grpc_cq_begin_op(call->cq, call);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func,
tag);
}
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 62f4020d7e..e66b4ed2d8 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -149,7 +149,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ cq_.Pluck(&buf); /* status ignored */
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 46d476b9f4..efec05bbf3 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -81,7 +81,8 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, event) {
if (err) {
- throw err;
+ // Something has gone wrong. Stop writing by failing to call callback
+ return;
}
callback();
});
@@ -120,7 +121,9 @@ function _read(size) {
*/
function readCallback(err, event) {
if (err) {
- throw err;
+ // Something has gone wrong. Stop reading and wait for status
+ self.finished = true;
+ return;
}
if (self.finished) {
self.push(null);
@@ -237,10 +240,6 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
- if (err) {
- callback(err);
- return;
- }
emitter.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -248,6 +247,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
callback(error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ callback(err);
+ return;
+ }
}
emitter.emit('metadata', response.metadata);
callback(null, deserialize(response.read));
@@ -300,7 +305,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
- callback(err);
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
return;
}
stream.emit('metadata', response.metadata);
@@ -309,10 +315,6 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
- if (err) {
- callback(err);
- return;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -320,6 +322,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
callback(error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ callback(err);
+ return;
+ }
}
callback(null, deserialize(response.read));
});
@@ -373,16 +381,15 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
- throw err;
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- if (err) {
- throw err;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -390,6 +397,12 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ stream.emit('error', err);
+ return;
+ }
}
});
});
@@ -438,16 +451,15 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
- throw err;
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- if (err) {
- throw err;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -455,6 +467,12 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ stream.emit('error', err);
+ return;
+ }
}
});
});