diff options
author | murgatroid99 <michael.lumish@gmail.com> | 2015-01-26 14:11:18 -0800 |
---|---|---|
committer | murgatroid99 <michael.lumish@gmail.com> | 2015-01-26 14:11:18 -0800 |
commit | 55dd2ba908fc3cb927d829e24983dc4132e6f558 (patch) | |
tree | 793774d50982051640e6f24839704e031a5abb80 | |
parent | d3e95a36327c7a2249434e554846c8e44aa7daa0 (diff) |
Added cancel to client APIs and cancelled event to server APIs
-rw-r--r-- | src/node/client.js | 2 | ||||
-rw-r--r-- | src/node/server.js | 1 | ||||
-rw-r--r-- | src/node/surface_client.js | 10 | ||||
-rw-r--r-- | src/node/surface_server.js | 9 | ||||
-rw-r--r-- | src/node/test/client_server_test.js | 28 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 53 |
6 files changed, 102 insertions, 1 deletions
diff --git a/src/node/client.js b/src/node/client.js index a039b0dcfa..3a1c9eef84 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -165,7 +165,7 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { * with status CANCELLED. */ GrpcClientStream.prototype.cancel = function() { - self._call.cancel(); + this._call.cancel(); }; /** diff --git a/src/node/server.js b/src/node/server.js index fe50acb5a1..03cdbe6f98 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -246,6 +246,7 @@ function Server(options) { call.serverAccept(function(event) { if (event.data.code === grpc.status.CANCELLED) { cancelled = true; + stream.emit('cancelled'); } }, 0); call.serverEndInitialMetadata(0); diff --git a/src/node/surface_client.js b/src/node/surface_client.js index f7123450c2..16c31809f4 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -179,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { callback(e); } }); + stream.on('status', function forwardStatus(status) { + if (status.code !== client.status.OK) { + callback(status); + } + }); return emitter; } return makeUnaryRequest; @@ -216,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { callback(e); } }); + stream.on('status', function forwardStatus(status) { + if (status.code !== client.status.OK) { + callback(status); + } + }); return obj_stream; } return makeClientStreamRequest; diff --git a/src/node/surface_server.js b/src/node/surface_server.js index 07c5339f62..af23ec211c 100644 --- a/src/node/surface_server.js +++ b/src/node/surface_server.js @@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) { get: function() { return stream.cancelled; } }); var self = this; + this._stream.on('cancelled', function() { + self.emit('cancelled'); + }); this._stream.on('data', function forwardData(chunk) { if (!self.push(chunk)) { self._stream.pause(); @@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) { var options = {objectMode: true}; Writable.call(this, options); this._stream = stream; + this._stream.on('cancelled', function() { + self.emit('cancelled'); + }); this.on('finish', function() { this._stream.end(); }); @@ -138,6 +144,9 @@ function makeUnaryHandler(handler) { Object.defineProperty(call, 'cancelled', { get: function() { return stream.cancelled;} }); + stream.on('cancelled', function() { + call.emit('cancelled'); + }); handler(call, function sendUnaryData(err, value) { if (err) { stream.emit('error', err); diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js index 2a25908684..99438a1659 100644 --- a/src/node/test/client_server_test.js +++ b/src/node/test/client_server_test.js @@ -77,6 +77,14 @@ function errorHandler(stream) { }; } +/** + * Wait for a cancellation instead of responding + * @param {Stream} stream + */ +function cancelHandler(stream) { + // do nothing +} + describe('echo client', function() { it('should receive echo responses', function(done) { var server = new Server(); @@ -125,6 +133,26 @@ describe('echo client', function() { done(); }); }); + it('should be able to cancel a call', function(done) { + var server = new Server(); + var port_num = server.bind('0.0.0.0:0'); + server.register('cancellation', cancelHandler); + server.start(); + + var channel = new grpc.Channel('localhost:' + port_num); + var stream = client.makeRequest( + channel, + 'cancellation', + null, + getDeadline(1)); + + stream.cancel(); + stream.on('status', function(status) { + assert.equal(status.code, grpc.status.CANCELLED); + server.shutdown(); + done(); + }); + }); }); /* TODO(mlumish): explore options for reducing duplication between this test * and the insecure echo client test */ diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 34f1a156eb..16e4869d83 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -35,6 +35,8 @@ var assert = require('assert'); var surface_server = require('../surface_server.js'); +var surface_client = require('../surface_client.js'); + var ProtoBuf = require('protobufjs'); var grpc = require('..'); @@ -73,3 +75,54 @@ describe('Surface server constructor', function() { }, /math.Math/); }); }); +describe('Surface client', function() { + var client; + var server; + before(function() { + var Server = grpc.buildServer([mathService]); + server = new Server({ + 'math.Math': { + 'div': function(stream) {}, + 'divMany': function(stream) {}, + 'fib': function(stream) {}, + 'sum': function(stream) {} + } + }); + var port = server.bind('localhost:0'); + var Client = surface_client.makeClientConstructor(mathService); + client = new Client('localhost:' + port); + }); + after(function() { + server.shutdown(); + }); + it('Should correctly cancel a unary call', function(done) { + var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a client stream call', function(done) { + var call = client.sum(function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a server stream call', function(done) { + var call = client.fib({'limit': 5}); + call.on('status', function(status) { + assert.strictEqual(status.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a bidi stream call', function(done) { + var call = client.divMany(); + call.on('status', function(status) { + assert.strictEqual(status.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); +}); |