aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node/src/client.js
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2016-03-04 14:54:10 -0800
committerGravatar murgatroid99 <mlumish@google.com>2016-03-04 14:54:10 -0800
commit9adecb06e0bc66ffc98aafa087f946ee42473c01 (patch)
treef34d170bbf51483773312a4af9156f0fbb272545 /src/node/src/client.js
parent929ddb01e3676bd47a8b1e3c94ab08874ce2a4cb (diff)
Fix race between parsing messages and receiving status in Node client
Diffstat (limited to 'src/node/src/client.js')
-rw-r--r--src/node/src/client.js114
1 files changed, 84 insertions, 30 deletions
diff --git a/src/node/src/client.js b/src/node/src/client.js
index c65dd73650..9acf51bd98 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -131,9 +131,69 @@ function ClientReadableStream(call, deserialize) {
this.finished = false;
this.reading = false;
this.deserialize = common.wrapIgnoreNull(deserialize);
+ /* Status generated from reading messages from the server. Overrides the
+ * status from the server if not OK */
+ this.read_status = null;
+ /* Status received from the server. */
+ this.received_status = null;
}
/**
+ * Called when all messages from the server have been processed. The status
+ * parameter indicates that the call should end with that status. status
+ * defaults to OK if not provided.
+ * @param {Object!} status The status that the call should end with
+ */
+function _readsDone(status) {
+ /* jshint validthis: true */
+ if (!status) {
+ status = {code: grpc.status.OK, details: 'OK'};
+ }
+ this.finished = true;
+ this.read_status = status;
+ this._emitStatusIfDone();
+}
+
+ClientReadableStream.prototype._readsDone = _readsDone;
+
+/**
+ * Called to indicate that we have received a status from the server.
+ */
+function _receiveStatus(status) {
+ /* jshint validthis: true */
+ this.received_status = status;
+ this._emitStatusIfDone();
+}
+
+ClientReadableStream.prototype._receiveStatus = _receiveStatus;
+
+/**
+ * If we have both processed all incoming messages and received the status from
+ * the server, emit the status. Otherwise, do nothing.
+ */
+function _emitStatusIfDone() {
+ /* jshint validthis: true */
+ var status;
+ if (this.read_status && this.received_status) {
+ if (this.read_status.code !== grpc.status.OK) {
+ status = this.read_status;
+ } else {
+ status = this.received_status;
+ }
+ this.emit('status', status);
+ if (status.code !== grpc.status.OK) {
+ var error = new Error(status.details);
+ error.code = status.code;
+ error.metadata = status.metadata;
+ this.emit('error', error);
+ return;
+ }
+ }
+}
+
+ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
+
+/**
* Read the next object from the stream.
* @access private
* @param {*} size Ignored because we use objectMode=true
@@ -150,6 +210,7 @@ function _read(size) {
if (err) {
// Something has gone wrong. Stop reading and wait for status
self.finished = true;
+ self._readsDone();
return;
}
var data = event.read;
@@ -157,8 +218,11 @@ function _read(size) {
try {
deserialized = self.deserialize(data);
} catch (e) {
- self.call.cancelWithStatus(grpc.status.INTERNAL,
- 'Failed to parse server response');
+ self._readsDone({code: grpc.status.INTERNAL,
+ details: 'Failed to parse server response'});
+ }
+ if (data === null) {
+ self._readsDone();
}
if (self.push(deserialized) && data !== null) {
var read_batch = {};
@@ -198,6 +262,11 @@ function ClientDuplexStream(call, serialize, deserialize) {
this.serialize = common.wrapIgnoreNull(serialize);
this.deserialize = common.wrapIgnoreNull(deserialize);
this.call = call;
+ /* Status generated from reading messages from the server. Overrides the
+ * status from the server if not OK */
+ this.read_status = null;
+ /* Status received from the server. */
+ this.received_status = null;
this.on('finish', function() {
var batch = {};
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
@@ -205,6 +274,9 @@ function ClientDuplexStream(call, serialize, deserialize) {
});
}
+ClientDuplexStream.prototype._readsDone = _readsDone;
+ClientDuplexStream.prototype._receiveStatus = _receiveStatus;
+ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone;
ClientDuplexStream.prototype._read = _read;
ClientDuplexStream.prototype._write = _write;
@@ -487,22 +559,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- response.status.metadata = Metadata._fromCoreRepresentation(
- response.status.metadata);
- stream.emit('status', response.status);
- if (response.status.code !== grpc.status.OK) {
- var error = new Error(response.status.details);
- error.code = response.status.code;
- error.metadata = response.status.metadata;
- stream.emit('error', error);
+ if (err) {
+ stream.emit('error', err);
return;
- } else {
- if (err) {
- // Got a batch error, but OK status. Something went wrong
- stream.emit('error', err);
- return;
- }
}
+ response.status.metadata = Metadata._fromCoreRepresentation(
+ response.status.metadata);
+ stream._receiveStatus(response.status);
});
return stream;
}
@@ -552,22 +615,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- response.status.metadata = Metadata._fromCoreRepresentation(
- response.status.metadata);
- stream.emit('status', response.status);
- if (response.status.code !== grpc.status.OK) {
- var error = new Error(response.status.details);
- error.code = response.status.code;
- error.metadata = response.status.metadata;
- stream.emit('error', error);
+ if (err) {
+ stream.emit('error', err);
return;
- } else {
- if (err) {
- // Got a batch error, but OK status. Something went wrong
- stream.emit('error', err);
- return;
- }
}
+ response.status.metadata = Metadata._fromCoreRepresentation(
+ response.status.metadata);
+ stream._receiveStatus(response.status);
});
return stream;
}