aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-01-26 15:58:39 -0800
committerGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-01-26 15:58:39 -0800
commit49724152edc6907fa4b872f9e7580d4b78a06f77 (patch)
tree4e180912186dcf3e8a961aac249b22cb72833fc0 /src
parent7d8dcd719d8972e70f124d237af4f662c9362f15 (diff)
parent12958f9f256d91e35e53d4eeb55609aeaae2efd8 (diff)
Merge pull request #221 from murgatroid99/node_surface_cancellation
Node surface cancellation
Diffstat (limited to 'src')
-rw-r--r--src/node/client.js8
-rw-r--r--src/node/server.js1
-rw-r--r--src/node/surface_client.js23
-rw-r--r--src/node/surface_server.js9
-rw-r--r--src/node/test/client_server_test.js28
-rw-r--r--src/node/test/surface_test.js53
6 files changed, 122 insertions, 0 deletions
diff --git a/src/node/client.js b/src/node/client.js
index 7007852b93..3a1c9eef84 100644
--- a/src/node/client.js
+++ b/src/node/client.js
@@ -161,6 +161,14 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
};
/**
+ * Cancel the ongoing call. If the call has not already finished, it will finish
+ * with status CANCELLED.
+ */
+GrpcClientStream.prototype.cancel = function() {
+ this._call.cancel();
+};
+
+/**
* Make a request on the channel to the given method with the given arguments
* @param {grpc.Channel} channel The channel on which to make the request
* @param {string} method The method to request
diff --git a/src/node/server.js b/src/node/server.js
index fe50acb5a1..03cdbe6f98 100644
--- a/src/node/server.js
+++ b/src/node/server.js
@@ -246,6 +246,7 @@ function Server(options) {
call.serverAccept(function(event) {
if (event.data.code === grpc.status.CANCELLED) {
cancelled = true;
+ stream.emit('cancelled');
}
}, 0);
call.serverEndInitialMetadata(0);
diff --git a/src/node/surface_client.js b/src/node/surface_client.js
index b63ae13e8d..16c31809f4 100644
--- a/src/node/surface_client.js
+++ b/src/node/surface_client.js
@@ -129,6 +129,16 @@ function _write(chunk, encoding, callback) {
ClientWritableObjectStream.prototype._write = _write;
/**
+ * Cancel the underlying call
+ */
+function cancel() {
+ this._stream.cancel();
+}
+
+ClientReadableObjectStream.prototype.cancel = cancel;
+ClientWritableObjectStream.prototype.cancel = cancel;
+
+/**
* Get a function that can make unary requests to the specified method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
@@ -155,6 +165,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
var emitter = new EventEmitter();
+ emitter.cancel = function cancel() {
+ stream.cancel();
+ };
forwardEvent(stream, emitter, 'status');
forwardEvent(stream, emitter, 'metadata');
stream.write(argument);
@@ -166,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
callback(e);
}
});
+ stream.on('status', function forwardStatus(status) {
+ if (status.code !== client.status.OK) {
+ callback(status);
+ }
+ });
return emitter;
}
return makeUnaryRequest;
@@ -203,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
callback(e);
}
});
+ stream.on('status', function forwardStatus(status) {
+ if (status.code !== client.status.OK) {
+ callback(status);
+ }
+ });
return obj_stream;
}
return makeClientStreamRequest;
diff --git a/src/node/surface_server.js b/src/node/surface_server.js
index 07c5339f62..af23ec211c 100644
--- a/src/node/surface_server.js
+++ b/src/node/surface_server.js
@@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) {
get: function() { return stream.cancelled; }
});
var self = this;
+ this._stream.on('cancelled', function() {
+ self.emit('cancelled');
+ });
this._stream.on('data', function forwardData(chunk) {
if (!self.push(chunk)) {
self._stream.pause();
@@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) {
var options = {objectMode: true};
Writable.call(this, options);
this._stream = stream;
+ this._stream.on('cancelled', function() {
+ self.emit('cancelled');
+ });
this.on('finish', function() {
this._stream.end();
});
@@ -138,6 +144,9 @@ function makeUnaryHandler(handler) {
Object.defineProperty(call, 'cancelled', {
get: function() { return stream.cancelled;}
});
+ stream.on('cancelled', function() {
+ call.emit('cancelled');
+ });
handler(call, function sendUnaryData(err, value) {
if (err) {
stream.emit('error', err);
diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js
index 2a25908684..99438a1659 100644
--- a/src/node/test/client_server_test.js
+++ b/src/node/test/client_server_test.js
@@ -77,6 +77,14 @@ function errorHandler(stream) {
};
}
+/**
+ * Wait for a cancellation instead of responding
+ * @param {Stream} stream
+ */
+function cancelHandler(stream) {
+ // do nothing
+}
+
describe('echo client', function() {
it('should receive echo responses', function(done) {
var server = new Server();
@@ -125,6 +133,26 @@ describe('echo client', function() {
done();
});
});
+ it('should be able to cancel a call', function(done) {
+ var server = new Server();
+ var port_num = server.bind('0.0.0.0:0');
+ server.register('cancellation', cancelHandler);
+ server.start();
+
+ var channel = new grpc.Channel('localhost:' + port_num);
+ var stream = client.makeRequest(
+ channel,
+ 'cancellation',
+ null,
+ getDeadline(1));
+
+ stream.cancel();
+ stream.on('status', function(status) {
+ assert.equal(status.code, grpc.status.CANCELLED);
+ server.shutdown();
+ done();
+ });
+ });
});
/* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 34f1a156eb..16e4869d83 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -35,6 +35,8 @@ var assert = require('assert');
var surface_server = require('../surface_server.js');
+var surface_client = require('../surface_client.js');
+
var ProtoBuf = require('protobufjs');
var grpc = require('..');
@@ -73,3 +75,54 @@ describe('Surface server constructor', function() {
}, /math.Math/);
});
});
+describe('Surface client', function() {
+ var client;
+ var server;
+ before(function() {
+ var Server = grpc.buildServer([mathService]);
+ server = new Server({
+ 'math.Math': {
+ 'div': function(stream) {},
+ 'divMany': function(stream) {},
+ 'fib': function(stream) {},
+ 'sum': function(stream) {}
+ }
+ });
+ var port = server.bind('localhost:0');
+ var Client = surface_client.makeClientConstructor(mathService);
+ client = new Client('localhost:' + port);
+ });
+ after(function() {
+ server.shutdown();
+ });
+ it('Should correctly cancel a unary call', function(done) {
+ var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
+ assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a client stream call', function(done) {
+ var call = client.sum(function(err, resp) {
+ assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a server stream call', function(done) {
+ var call = client.fib({'limit': 5});
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a bidi stream call', function(done) {
+ var call = client.divMany();
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+});