aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar murgatroid99 <michael.lumish@gmail.com>2015-01-26 14:11:18 -0800
committerGravatar murgatroid99 <michael.lumish@gmail.com>2015-01-26 14:11:18 -0800
commit55dd2ba908fc3cb927d829e24983dc4132e6f558 (patch)
tree793774d50982051640e6f24839704e031a5abb80
parentd3e95a36327c7a2249434e554846c8e44aa7daa0 (diff)
Added cancel to client APIs and cancelled event to server APIs
-rw-r--r--src/node/client.js2
-rw-r--r--src/node/server.js1
-rw-r--r--src/node/surface_client.js10
-rw-r--r--src/node/surface_server.js9
-rw-r--r--src/node/test/client_server_test.js28
-rw-r--r--src/node/test/surface_test.js53
6 files changed, 102 insertions, 1 deletions
diff --git a/src/node/client.js b/src/node/client.js
index a039b0dcfa..3a1c9eef84 100644
--- a/src/node/client.js
+++ b/src/node/client.js
@@ -165,7 +165,7 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
* with status CANCELLED.
*/
GrpcClientStream.prototype.cancel = function() {
- self._call.cancel();
+ this._call.cancel();
};
/**
diff --git a/src/node/server.js b/src/node/server.js
index fe50acb5a1..03cdbe6f98 100644
--- a/src/node/server.js
+++ b/src/node/server.js
@@ -246,6 +246,7 @@ function Server(options) {
call.serverAccept(function(event) {
if (event.data.code === grpc.status.CANCELLED) {
cancelled = true;
+ stream.emit('cancelled');
}
}, 0);
call.serverEndInitialMetadata(0);
diff --git a/src/node/surface_client.js b/src/node/surface_client.js
index f7123450c2..16c31809f4 100644
--- a/src/node/surface_client.js
+++ b/src/node/surface_client.js
@@ -179,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
callback(e);
}
});
+ stream.on('status', function forwardStatus(status) {
+ if (status.code !== client.status.OK) {
+ callback(status);
+ }
+ });
return emitter;
}
return makeUnaryRequest;
@@ -216,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
callback(e);
}
});
+ stream.on('status', function forwardStatus(status) {
+ if (status.code !== client.status.OK) {
+ callback(status);
+ }
+ });
return obj_stream;
}
return makeClientStreamRequest;
diff --git a/src/node/surface_server.js b/src/node/surface_server.js
index 07c5339f62..af23ec211c 100644
--- a/src/node/surface_server.js
+++ b/src/node/surface_server.js
@@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) {
get: function() { return stream.cancelled; }
});
var self = this;
+ this._stream.on('cancelled', function() {
+ self.emit('cancelled');
+ });
this._stream.on('data', function forwardData(chunk) {
if (!self.push(chunk)) {
self._stream.pause();
@@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) {
var options = {objectMode: true};
Writable.call(this, options);
this._stream = stream;
+ this._stream.on('cancelled', function() {
+ self.emit('cancelled');
+ });
this.on('finish', function() {
this._stream.end();
});
@@ -138,6 +144,9 @@ function makeUnaryHandler(handler) {
Object.defineProperty(call, 'cancelled', {
get: function() { return stream.cancelled;}
});
+ stream.on('cancelled', function() {
+ call.emit('cancelled');
+ });
handler(call, function sendUnaryData(err, value) {
if (err) {
stream.emit('error', err);
diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js
index 2a25908684..99438a1659 100644
--- a/src/node/test/client_server_test.js
+++ b/src/node/test/client_server_test.js
@@ -77,6 +77,14 @@ function errorHandler(stream) {
};
}
+/**
+ * Wait for a cancellation instead of responding
+ * @param {Stream} stream
+ */
+function cancelHandler(stream) {
+ // do nothing
+}
+
describe('echo client', function() {
it('should receive echo responses', function(done) {
var server = new Server();
@@ -125,6 +133,26 @@ describe('echo client', function() {
done();
});
});
+ it('should be able to cancel a call', function(done) {
+ var server = new Server();
+ var port_num = server.bind('0.0.0.0:0');
+ server.register('cancellation', cancelHandler);
+ server.start();
+
+ var channel = new grpc.Channel('localhost:' + port_num);
+ var stream = client.makeRequest(
+ channel,
+ 'cancellation',
+ null,
+ getDeadline(1));
+
+ stream.cancel();
+ stream.on('status', function(status) {
+ assert.equal(status.code, grpc.status.CANCELLED);
+ server.shutdown();
+ done();
+ });
+ });
});
/* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 34f1a156eb..16e4869d83 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -35,6 +35,8 @@ var assert = require('assert');
var surface_server = require('../surface_server.js');
+var surface_client = require('../surface_client.js');
+
var ProtoBuf = require('protobufjs');
var grpc = require('..');
@@ -73,3 +75,54 @@ describe('Surface server constructor', function() {
}, /math.Math/);
});
});
+describe('Surface client', function() {
+ var client;
+ var server;
+ before(function() {
+ var Server = grpc.buildServer([mathService]);
+ server = new Server({
+ 'math.Math': {
+ 'div': function(stream) {},
+ 'divMany': function(stream) {},
+ 'fib': function(stream) {},
+ 'sum': function(stream) {}
+ }
+ });
+ var port = server.bind('localhost:0');
+ var Client = surface_client.makeClientConstructor(mathService);
+ client = new Client('localhost:' + port);
+ });
+ after(function() {
+ server.shutdown();
+ });
+ it('Should correctly cancel a unary call', function(done) {
+ var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
+ assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a client stream call', function(done) {
+ var call = client.sum(function(err, resp) {
+ assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a server stream call', function(done) {
+ var call = client.fib({'limit': 5});
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a bidi stream call', function(done) {
+ var call = client.divMany();
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+});