aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/node/interop/interop_client.js3
-rw-r--r--src/node/src/client.js114
-rw-r--r--src/node/test/surface_test.js8
-rw-r--r--src/objective-c/GRPCClient/private/GRPCCompletionQueue.h2
-rw-r--r--src/objective-c/GRPCClient/private/GRPCCompletionQueue.m2
5 files changed, 97 insertions, 32 deletions
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index db383e4d00..5602011a8e 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -290,6 +290,7 @@ function timeoutOnSleepingServer(client, done) {
call.write({
payload: {body: zeroBuffer(27182)}
});
+ call.on('data', function() {});
call.on('error', function(error) {
assert(error.code === grpc.status.DEADLINE_EXCEEDED ||
@@ -336,6 +337,7 @@ function customMetadata(client, done) {
['test_initial_metadata_value']);
done();
});
+ stream.on('data', function() {});
stream.on('status', function(status) {
var echo_trailer = status.metadata.get(ECHO_TRAILING_KEY);
assert(echo_trailer.length > 0);
@@ -361,6 +363,7 @@ function statusCodeAndMessage(client, done) {
done();
});
var duplex = client.fullDuplexCall();
+ duplex.on('data', function() {});
duplex.on('status', function(status) {
assert(status);
assert.strictEqual(status.code, 2);
diff --git a/src/node/src/client.js b/src/node/src/client.js
index c65dd73650..9acf51bd98 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -131,9 +131,69 @@ function ClientReadableStream(call, deserialize) {
this.finished = false;
this.reading = false;
this.deserialize = common.wrapIgnoreNull(deserialize);
+ /* Status generated from reading messages from the server. Overrides the
+ * status from the server if not OK */
+ this.read_status = null;
+ /* Status received from the server. */
+ this.received_status = null;
}
/**
+ * Called when all messages from the server have been processed. The status
+ * parameter indicates that the call should end with that status. status
+ * defaults to OK if not provided.
+ * @param {Object!} status The status that the call should end with
+ */
+function _readsDone(status) {
+ /* jshint validthis: true */
+ if (!status) {
+ status = {code: grpc.status.OK, details: 'OK'};
+ }
+ this.finished = true;
+ this.read_status = status;
+ this._emitStatusIfDone();
+}
+
+ClientReadableStream.prototype._readsDone = _readsDone;
+
+/**
+ * Called to indicate that we have received a status from the server.
+ */
+function _receiveStatus(status) {
+ /* jshint validthis: true */
+ this.received_status = status;
+ this._emitStatusIfDone();
+}
+
+ClientReadableStream.prototype._receiveStatus = _receiveStatus;
+
+/**
+ * If we have both processed all incoming messages and received the status from
+ * the server, emit the status. Otherwise, do nothing.
+ */
+function _emitStatusIfDone() {
+ /* jshint validthis: true */
+ var status;
+ if (this.read_status && this.received_status) {
+ if (this.read_status.code !== grpc.status.OK) {
+ status = this.read_status;
+ } else {
+ status = this.received_status;
+ }
+ this.emit('status', status);
+ if (status.code !== grpc.status.OK) {
+ var error = new Error(status.details);
+ error.code = status.code;
+ error.metadata = status.metadata;
+ this.emit('error', error);
+ return;
+ }
+ }
+}
+
+ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
+
+/**
* Read the next object from the stream.
* @access private
* @param {*} size Ignored because we use objectMode=true
@@ -150,6 +210,7 @@ function _read(size) {
if (err) {
// Something has gone wrong. Stop reading and wait for status
self.finished = true;
+ self._readsDone();
return;
}
var data = event.read;
@@ -157,8 +218,11 @@ function _read(size) {
try {
deserialized = self.deserialize(data);
} catch (e) {
- self.call.cancelWithStatus(grpc.status.INTERNAL,
- 'Failed to parse server response');
+ self._readsDone({code: grpc.status.INTERNAL,
+ details: 'Failed to parse server response'});
+ }
+ if (data === null) {
+ self._readsDone();
}
if (self.push(deserialized) && data !== null) {
var read_batch = {};
@@ -198,6 +262,11 @@ function ClientDuplexStream(call, serialize, deserialize) {
this.serialize = common.wrapIgnoreNull(serialize);
this.deserialize = common.wrapIgnoreNull(deserialize);
this.call = call;
+ /* Status generated from reading messages from the server. Overrides the
+ * status from the server if not OK */
+ this.read_status = null;
+ /* Status received from the server. */
+ this.received_status = null;
this.on('finish', function() {
var batch = {};
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
@@ -205,6 +274,9 @@ function ClientDuplexStream(call, serialize, deserialize) {
});
}
+ClientDuplexStream.prototype._readsDone = _readsDone;
+ClientDuplexStream.prototype._receiveStatus = _receiveStatus;
+ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone;
ClientDuplexStream.prototype._read = _read;
ClientDuplexStream.prototype._write = _write;
@@ -487,22 +559,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- response.status.metadata = Metadata._fromCoreRepresentation(
- response.status.metadata);
- stream.emit('status', response.status);
- if (response.status.code !== grpc.status.OK) {
- var error = new Error(response.status.details);
- error.code = response.status.code;
- error.metadata = response.status.metadata;
- stream.emit('error', error);
+ if (err) {
+ stream.emit('error', err);
return;
- } else {
- if (err) {
- // Got a batch error, but OK status. Something went wrong
- stream.emit('error', err);
- return;
- }
}
+ response.status.metadata = Metadata._fromCoreRepresentation(
+ response.status.metadata);
+ stream._receiveStatus(response.status);
});
return stream;
}
@@ -552,22 +615,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- response.status.metadata = Metadata._fromCoreRepresentation(
- response.status.metadata);
- stream.emit('status', response.status);
- if (response.status.code !== grpc.status.OK) {
- var error = new Error(response.status.details);
- error.code = response.status.code;
- error.metadata = response.status.metadata;
- stream.emit('error', error);
+ if (err) {
+ stream.emit('error', err);
return;
- } else {
- if (err) {
- // Got a batch error, but OK status. Something went wrong
- stream.emit('error', err);
- return;
- }
}
+ response.status.metadata = Metadata._fromCoreRepresentation(
+ response.status.metadata);
+ stream._receiveStatus(response.status);
});
return stream;
}
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 530f1f7749..8a232d6fc4 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -1000,6 +1000,7 @@ describe('Call propagation', function() {
proxy_impl.serverStream = function(parent) {
var child = client.serverStream(parent.request, null,
{parent: parent});
+ child.on('data', function() {});
child.on('error', function(err) {
assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED);
@@ -1013,6 +1014,7 @@ describe('Call propagation', function() {
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
call = proxy_client.serverStream({});
+ call.on('data', function() {});
call.on('error', function(err) {
done();
});
@@ -1022,6 +1024,7 @@ describe('Call propagation', function() {
var call;
proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(null, {parent: parent});
+ child.on('data', function() {});
child.on('error', function(err) {
assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED);
@@ -1035,6 +1038,7 @@ describe('Call propagation', function() {
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
call = proxy_client.bidiStream();
+ call.on('data', function() {});
call.on('error', function(err) {
done();
});
@@ -1074,6 +1078,7 @@ describe('Call propagation', function() {
proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(
null, {parent: parent, propagate_flags: deadline_flags});
+ child.on('data', function() {});
child.on('error', function(err) {
assert(err);
assert(err.code === grpc.status.DEADLINE_EXCEEDED ||
@@ -1089,6 +1094,7 @@ describe('Call propagation', function() {
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
var call = proxy_client.bidiStream(null, {deadline: deadline});
+ call.on('data', function() {});
call.on('error', function(err) {
done();
});
@@ -1130,6 +1136,7 @@ describe('Cancelling surface client', function() {
});
it('Should correctly cancel a server stream call', function(done) {
var call = client.fib({'limit': 5});
+ call.on('data', function() {});
call.on('error', function(error) {
assert.strictEqual(error.code, surface_client.status.CANCELLED);
done();
@@ -1138,6 +1145,7 @@ describe('Cancelling surface client', function() {
});
it('Should correctly cancel a bidi stream call', function(done) {
var call = client.divMany();
+ call.on('data', function() {});
call.on('error', function(error) {
assert.strictEqual(error.code, surface_client.status.CANCELLED);
done();
diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
index fe3b8f39d1..7b66cd4c32 100644
--- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
+++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
index ea2b01ee1d..ff3031678c 100644
--- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
+++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without