From d3e95a36327c7a2249434e554846c8e44aa7daa0 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 26 Jan 2015 12:57:10 -0800 Subject: Added cancel to surface calls --- src/node/client.js | 8 ++++++++ src/node/surface_client.js | 13 +++++++++++++ 2 files changed, 21 insertions(+) (limited to 'src') diff --git a/src/node/client.js b/src/node/client.js index 7007852b93..a039b0dcfa 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -160,6 +160,14 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { }, 0); }; +/** + * Cancel the ongoing call. If the call has not already finished, it will finish + * with status CANCELLED. + */ +GrpcClientStream.prototype.cancel = function() { + self._call.cancel(); +}; + /** * Make a request on the channel to the given method with the given arguments * @param {grpc.Channel} channel The channel on which to make the request diff --git a/src/node/surface_client.js b/src/node/surface_client.js index b63ae13e8d..f7123450c2 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -128,6 +128,16 @@ function _write(chunk, encoding, callback) { */ ClientWritableObjectStream.prototype._write = _write; +/** + * Cancel the underlying call + */ +function cancel() { + this._stream.cancel(); +} + +ClientReadableObjectStream.prototype.cancel = cancel; +ClientWritableObjectStream.prototype.cancel = cancel; + /** * Get a function that can make unary requests to the specified method. * @param {string} method The name of the method to request @@ -155,6 +165,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { var stream = client.makeRequest(this.channel, method, serialize, deserialize, metadata, deadline); var emitter = new EventEmitter(); + emitter.cancel = function cancel() { + stream.cancel(); + }; forwardEvent(stream, emitter, 'status'); forwardEvent(stream, emitter, 'metadata'); stream.write(argument); -- cgit v1.2.3 From 55dd2ba908fc3cb927d829e24983dc4132e6f558 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 26 Jan 2015 14:11:18 -0800 Subject: Added cancel to client APIs and cancelled event to server APIs --- src/node/client.js | 2 +- src/node/server.js | 1 + src/node/surface_client.js | 10 +++++++ src/node/surface_server.js | 9 +++++++ src/node/test/client_server_test.js | 28 ++++++++++++++++++++ src/node/test/surface_test.js | 53 +++++++++++++++++++++++++++++++++++++ 6 files changed, 102 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/node/client.js b/src/node/client.js index a039b0dcfa..3a1c9eef84 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -165,7 +165,7 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { * with status CANCELLED. */ GrpcClientStream.prototype.cancel = function() { - self._call.cancel(); + this._call.cancel(); }; /** diff --git a/src/node/server.js b/src/node/server.js index fe50acb5a1..03cdbe6f98 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -246,6 +246,7 @@ function Server(options) { call.serverAccept(function(event) { if (event.data.code === grpc.status.CANCELLED) { cancelled = true; + stream.emit('cancelled'); } }, 0); call.serverEndInitialMetadata(0); diff --git a/src/node/surface_client.js b/src/node/surface_client.js index f7123450c2..16c31809f4 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -179,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { callback(e); } }); + stream.on('status', function forwardStatus(status) { + if (status.code !== client.status.OK) { + callback(status); + } + }); return emitter; } return makeUnaryRequest; @@ -216,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { callback(e); } }); + stream.on('status', function forwardStatus(status) { + if (status.code !== client.status.OK) { + callback(status); + } + }); return obj_stream; } return makeClientStreamRequest; diff --git a/src/node/surface_server.js b/src/node/surface_server.js index 07c5339f62..af23ec211c 100644 --- a/src/node/surface_server.js +++ b/src/node/surface_server.js @@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) { get: function() { return stream.cancelled; } }); var self = this; + this._stream.on('cancelled', function() { + self.emit('cancelled'); + }); this._stream.on('data', function forwardData(chunk) { if (!self.push(chunk)) { self._stream.pause(); @@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) { var options = {objectMode: true}; Writable.call(this, options); this._stream = stream; + this._stream.on('cancelled', function() { + self.emit('cancelled'); + }); this.on('finish', function() { this._stream.end(); }); @@ -138,6 +144,9 @@ function makeUnaryHandler(handler) { Object.defineProperty(call, 'cancelled', { get: function() { return stream.cancelled;} }); + stream.on('cancelled', function() { + call.emit('cancelled'); + }); handler(call, function sendUnaryData(err, value) { if (err) { stream.emit('error', err); diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js index 2a25908684..99438a1659 100644 --- a/src/node/test/client_server_test.js +++ b/src/node/test/client_server_test.js @@ -77,6 +77,14 @@ function errorHandler(stream) { }; } +/** + * Wait for a cancellation instead of responding + * @param {Stream} stream + */ +function cancelHandler(stream) { + // do nothing +} + describe('echo client', function() { it('should receive echo responses', function(done) { var server = new Server(); @@ -125,6 +133,26 @@ describe('echo client', function() { done(); }); }); + it('should be able to cancel a call', function(done) { + var server = new Server(); + var port_num = server.bind('0.0.0.0:0'); + server.register('cancellation', cancelHandler); + server.start(); + + var channel = new grpc.Channel('localhost:' + port_num); + var stream = client.makeRequest( + channel, + 'cancellation', + null, + getDeadline(1)); + + stream.cancel(); + stream.on('status', function(status) { + assert.equal(status.code, grpc.status.CANCELLED); + server.shutdown(); + done(); + }); + }); }); /* TODO(mlumish): explore options for reducing duplication between this test * and the insecure echo client test */ diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 34f1a156eb..16e4869d83 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -35,6 +35,8 @@ var assert = require('assert'); var surface_server = require('../surface_server.js'); +var surface_client = require('../surface_client.js'); + var ProtoBuf = require('protobufjs'); var grpc = require('..'); @@ -73,3 +75,54 @@ describe('Surface server constructor', function() { }, /math.Math/); }); }); +describe('Surface client', function() { + var client; + var server; + before(function() { + var Server = grpc.buildServer([mathService]); + server = new Server({ + 'math.Math': { + 'div': function(stream) {}, + 'divMany': function(stream) {}, + 'fib': function(stream) {}, + 'sum': function(stream) {} + } + }); + var port = server.bind('localhost:0'); + var Client = surface_client.makeClientConstructor(mathService); + client = new Client('localhost:' + port); + }); + after(function() { + server.shutdown(); + }); + it('Should correctly cancel a unary call', function(done) { + var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a client stream call', function(done) { + var call = client.sum(function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a server stream call', function(done) { + var call = client.fib({'limit': 5}); + call.on('status', function(status) { + assert.strictEqual(status.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a bidi stream call', function(done) { + var call = client.divMany(); + call.on('status', function(status) { + assert.strictEqual(status.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); +}); -- cgit v1.2.3