diff options
Diffstat (limited to 'src/node')
-rw-r--r-- | src/node/client.js | 6 | ||||
-rw-r--r-- | src/node/examples/math_server.js | 7 | ||||
-rw-r--r-- | src/node/interop/interop_client.js | 2 | ||||
-rw-r--r-- | src/node/server.js | 4 | ||||
-rw-r--r-- | src/node/surface_client.js | 113 | ||||
-rw-r--r-- | src/node/surface_server.js | 138 | ||||
-rw-r--r-- | src/node/test/interop_sanity_test.js | 5 |
7 files changed, 86 insertions, 189 deletions
diff --git a/src/node/client.js b/src/node/client.js index 2fefd14bbc..7007852b93 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -105,7 +105,7 @@ function GrpcClientStream(call, serialize, deserialize) { return; } var data = event.data; - if (self.push(data) && data != null) { + if (self.push(self.deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; @@ -155,7 +155,7 @@ GrpcClientStream.prototype._read = function(size) { */ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { var self = this; - self._call.startWrite(chunk, function(event) { + self._call.startWrite(self.serialize(chunk), function(event) { callback(); }, 0); }; @@ -185,7 +185,7 @@ function makeRequest(channel, if (metadata) { call.addMetadata(metadata); } - return new GrpcClientStream(call); + return new GrpcClientStream(call, serialize, deserialize); } /** diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js index d649b4fd6d..e65cfe3002 100644 --- a/src/node/examples/math_server.js +++ b/src/node/examples/math_server.js @@ -52,7 +52,8 @@ var Server = grpc.buildServer([math.Math.service]); */ function mathDiv(call, cb) { var req = call.request; - if (req.divisor == 0) { + // Unary + is explicit coersion to integer + if (+req.divisor === 0) { cb(new Error('cannot divide by zero')); } cb(null, { @@ -89,7 +90,7 @@ function mathSum(call, cb) { // Here, call is a standard readable Node object Stream var sum = 0; call.on('data', function(data) { - sum += data.num | 0; + sum += (+data.num); }); call.on('end', function() { cb(null, {num: sum}); @@ -104,7 +105,7 @@ function mathDivMany(stream) { Transform.call(this, options); } DivTransform.prototype._transform = function(div_args, encoding, callback) { - if (div_args.divisor == 0) { + if (+div_args.divisor === 0) { callback(new Error('cannot divide by zero')); } callback(null, { diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index cf75b9a77a..9306317b68 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -183,7 +183,7 @@ function pingPong(client, done) { assert.equal(response.payload.body.limit - response.payload.body.offset, response_sizes[index]); index += 1; - if (index == 4) { + if (index === 4) { call.end(); } else { call.write({ diff --git a/src/node/server.js b/src/node/server.js index eca20aa5fd..fe50acb5a1 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -151,7 +151,7 @@ function GrpcServerStream(call, serialize, deserialize) { return; } var data = event.data; - if (self.push(deserialize(data)) && data != null) { + if (self.push(self.deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; @@ -233,7 +233,7 @@ function Server(options) { function handleNewCall(event) { var call = event.call; var data = event.data; - if (data == null) { + if (data === null) { return; } server.requestCall(handleNewCall); diff --git a/src/node/surface_client.js b/src/node/surface_client.js index 996e3d101f..b63ae13e8d 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -63,114 +63,70 @@ util.inherits(ClientReadableObjectStream, Readable); * client side. Extends from stream.Readable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ClientReadableObjectStream(stream, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ClientReadableObjectStream(stream) { + var options = {objectMode: true}; Readable.call(this, options); this._stream = stream; var self = this; forwardEvent(stream, this, 'status'); forwardEvent(stream, this, 'metadata'); this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); this._stream.pause(); } -util.inherits(ClientWritableObjectStream, Writable); - /** - * Class for representing a gRPC client streaming call as a Node stream on the - * client side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {object} options Stream options + * _read implementation for both types of streams that allow reading. + * @this {ClientReadableObjectStream} + * @param {number} size Ignored */ -function ClientWritableObjectStream(stream, serialize, options) { - options = _.extend(options, {objectMode: true}); - Writable.call(this, options); - this._stream = stream; - this._serialize = serialize; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this.on('finish', function() { - this._stream.end(); - }); +function _read(size) { + this._stream.resume(); } +/** + * See docs for _read + */ +ClientReadableObjectStream.prototype._read = _read; -util.inherits(ClientBidiObjectStream, Duplex); +util.inherits(ClientWritableObjectStream, Writable); /** - * Class for representing a gRPC bidi streaming call as a Node stream on the - * client side. Extends from stream.Duplex. + * Class for representing a gRPC client streaming call as a Node stream on the + * client side. Extends from stream.Writable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ClientBidiObjectStream(stream, serialize, deserialize, options) { - options = _.extend(options, {objectMode: true}); - Duplex.call(this, options); +function ClientWritableObjectStream(stream) { + var options = {objectMode: true}; + Writable.call(this, options); this._stream = stream; - this._serialize = serialize; - var self = this; forwardEvent(stream, this, 'status'); forwardEvent(stream, this, 'metadata'); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { - self._stream.pause(); - } - }); - this._stream.pause(); this.on('finish', function() { this._stream.end(); }); } /** - * _read implementation for both types of streams that allow reading. - * @this {ClientReadableObjectStream|ClientBidiObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ClientReadableObjectStream.prototype._read = _read; -/** - * See docs for _read - */ -ClientBidiObjectStream.prototype._read = _read; - -/** * _write implementation for both types of streams that allow writing - * @this {ClientWritableObjectStream|ClientBidiObjectStream} + * @this {ClientWritableObjectStream} * @param {*} chunk The value to write to the stream * @param {string} encoding Ignored * @param {function(Error)} callback Callback to call when finished writing */ function _write(chunk, encoding, callback) { - this._stream.write(this._serialize(chunk), encoding, callback); + this._stream.write(chunk, encoding, callback); } /** * See docs for _write */ ClientWritableObjectStream.prototype._write = _write; -/** - * See docs for _write - */ -ClientBidiObjectStream.prototype._write = _write; /** * Get a function that can make unary requests to the specified method. @@ -196,15 +152,16 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeUnaryRequest(argument, callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); var emitter = new EventEmitter(); forwardEvent(stream, emitter, 'status'); forwardEvent(stream, emitter, 'metadata'); - stream.write(serialize(argument)); + stream.write(argument); stream.end(); stream.on('data', function forwardData(chunk) { try { - callback(null, deserialize(chunk)); + callback(null, chunk); } catch (e) { callback(e); } @@ -236,11 +193,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeClientStreamRequest(callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientWritableObjectStream(stream, serialize, {}); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientWritableObjectStream(stream); stream.on('data', function forwardData(chunk) { try { - callback(null, deserialize(chunk)); + callback(null, chunk); } catch (e) { callback(e); } @@ -272,9 +230,10 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeServerStreamRequest(argument, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientReadableObjectStream(stream, deserialize, {}); - stream.write(serialize(argument)); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientReadableObjectStream(stream); + stream.write(argument); stream.end(); return obj_stream; } @@ -301,12 +260,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeBidiStreamRequest(metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientBidiObjectStream(stream, - serialize, - deserialize, - {}); - return obj_stream; + return client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); } return makeBidiStreamRequest; } diff --git a/src/node/surface_server.js b/src/node/surface_server.js index bc688839fe..07c5339f62 100644 --- a/src/node/surface_server.js +++ b/src/node/surface_server.js @@ -54,11 +54,9 @@ util.inherits(ServerReadableObjectStream, Readable); * server side. Extends from stream.Readable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ServerReadableObjectStream(stream, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ServerReadableObjectStream(stream) { + var options = {objectMode: true}; Readable.call(this, options); this._stream = stream; Object.defineProperty(this, 'cancelled', { @@ -66,7 +64,7 @@ function ServerReadableObjectStream(stream, deserialize, options) { }); var self = this; this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); @@ -76,57 +74,6 @@ function ServerReadableObjectStream(stream, deserialize, options) { this._stream.pause(); } -util.inherits(ServerWritableObjectStream, Writable); - -/** - * Class for representing a gRPC server streaming call as a Node stream on the - * server side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {object} options Stream options - */ -function ServerWritableObjectStream(stream, serialize, options) { - options = _.extend(options, {objectMode: true}); - Writable.call(this, options); - this._stream = stream; - this._serialize = serialize; - this.on('finish', function() { - this._stream.end(); - }); -} - -util.inherits(ServerBidiObjectStream, Duplex); - -/** - * Class for representing a gRPC bidi streaming call as a Node stream on the - * server side. Extends from stream.Duplex. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options - */ -function ServerBidiObjectStream(stream, serialize, deserialize, options) { - options = _.extend(options, {objectMode: true}); - Duplex.call(this, options); - this._stream = stream; - this._serialize = serialize; - var self = this; - this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { - self._stream.pause(); - } - }); - this._stream.on('end', function forwardEnd() { - self.push(null); - }); - this._stream.pause(); - this.on('finish', function() { - this._stream.end(); - }); -} - /** * _read implementation for both types of streams that allow reading. * @this {ServerReadableObjectStream|ServerBidiObjectStream} @@ -140,39 +87,46 @@ function _read(size) { * See docs for _read */ ServerReadableObjectStream.prototype._read = _read; + +util.inherits(ServerWritableObjectStream, Writable); + /** - * See docs for _read + * Class for representing a gRPC server streaming call as a Node stream on the + * server side. Extends from stream.Writable. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call */ -ServerBidiObjectStream.prototype._read = _read; +function ServerWritableObjectStream(stream) { + var options = {objectMode: true}; + Writable.call(this, options); + this._stream = stream; + this.on('finish', function() { + this._stream.end(); + }); +} /** * _write implementation for both types of streams that allow writing - * @this {ServerWritableObjectStream|ServerBidiObjectStream} + * @this {ServerWritableObjectStream} * @param {*} chunk The value to write to the stream * @param {string} encoding Ignored * @param {function(Error)} callback Callback to call when finished writing */ function _write(chunk, encoding, callback) { - this._stream.write(this._serialize(chunk), encoding, callback); + this._stream.write(chunk, encoding, callback); } /** * See docs for _write */ ServerWritableObjectStream.prototype._write = _write; -/** - * See docs for _write - */ -ServerBidiObjectStream.prototype._write = _write; /** * Creates a binary stream handler function from a unary handler function * @param {function(Object, function(Error, *))} handler Unary call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeUnaryHandler(handler, serialize, deserialize) { +function makeUnaryHandler(handler) { /** * Handles a stream by reading a single data value, passing it to the handler, * and writing the response back to the stream. @@ -180,7 +134,7 @@ function makeUnaryHandler(handler, serialize, deserialize) { */ return function handleUnaryCall(stream) { stream.on('data', function handleUnaryData(value) { - var call = {request: deserialize(value)}; + var call = {request: value}; Object.defineProperty(call, 'cancelled', { get: function() { return stream.cancelled;} }); @@ -188,7 +142,7 @@ function makeUnaryHandler(handler, serialize, deserialize) { if (err) { stream.emit('error', err); } else { - stream.write(serialize(value)); + stream.write(value); stream.end(); } }); @@ -201,23 +155,21 @@ function makeUnaryHandler(handler, serialize, deserialize) { * function * @param {function(Readable, function(Error, *))} handler Client stream call * handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeClientStreamHandler(handler, serialize, deserialize) { +function makeClientStreamHandler(handler) { /** * Handles a stream by passing a deserializing stream to the handler and * writing the response back to the stream. * @param {stream} stream Binary data stream */ return function handleClientStreamCall(stream) { - var object_stream = new ServerReadableObjectStream(stream, deserialize, {}); + var object_stream = new ServerReadableObjectStream(stream); handler(object_stream, function sendClientStreamData(err, value) { if (err) { stream.emit('error', err); } else { - stream.write(serialize(value)); + stream.write(value); stream.end(); } }); @@ -228,11 +180,9 @@ function makeClientStreamHandler(handler, serialize, deserialize) { * Creates a binary stream handler function from a server stream handler * function * @param {function(Writable)} handler Server stream call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeServerStreamHandler(handler, serialize, deserialize) { +function makeServerStreamHandler(handler) { /** * Handles a stream by attaching it to a serializing stream, and passing it to * the handler. @@ -240,10 +190,8 @@ function makeServerStreamHandler(handler, serialize, deserialize) { */ return function handleServerStreamCall(stream) { stream.on('data', function handleClientData(value) { - var object_stream = new ServerWritableObjectStream(stream, - serialize, - {}); - object_stream.request = deserialize(value); + var object_stream = new ServerWritableObjectStream(stream); + object_stream.request = value; handler(object_stream); }); }; @@ -252,23 +200,10 @@ function makeServerStreamHandler(handler, serialize, deserialize) { /** * Creates a binary stream handler function from a bidi stream handler function * @param {function(Duplex)} handler Unary call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeBidiStreamHandler(handler, serialize, deserialize) { - /** - * Handles a stream by wrapping it in a serializing and deserializing object - * stream, and passing it to the handler. - * @param {stream} stream Binary data stream - */ - return function handleBidiStreamCall(stream) { - var object_stream = new ServerBidiObjectStream(stream, - serialize, - deserialize, - {}); - handler(object_stream); - }; +function makeBidiStreamHandler(handler) { + return handler; } /** @@ -341,10 +276,13 @@ function makeServerConstructor(services) { common.fullyQualifiedName(method) + ' not provided.'); } var binary_handler = handler_makers[method_type]( - service_handlers[service_name][decapitalize(method.name)], - common.serializeCls(method.resolvedResponseType.build()), - common.deserializeCls(method.resolvedRequestType.build())); - server.register(prefix + capitalize(method.name), binary_handler); + service_handlers[service_name][decapitalize(method.name)]); + var serialize = common.serializeCls( + method.resolvedResponseType.build()); + var deserialize = common.deserializeCls( + method.resolvedRequestType.build()); + server.register(prefix + capitalize(method.name), binary_handler, + serialize, deserialize); }); }, this); } diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 3c062b9788..6cc7d444cd 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -48,6 +48,9 @@ describe('Interop tests', function() { port = 'localhost:' + server_obj.port; done(); }); + after(function() { + server.shutdown(); + }); // This depends on not using a binary stream it('should pass empty_unary', function(done) { interop_client.runTest(port, name_override, 'empty_unary', true, done); @@ -65,7 +68,7 @@ describe('Interop tests', function() { it('should pass ping_pong', function(done) { interop_client.runTest(port, name_override, 'ping_pong', true, done); }); - it.skip('should pass empty_stream', function(done) { + it('should pass empty_stream', function(done) { interop_client.runTest(port, name_override, 'empty_stream', true, done); }); }); |